rf_parityloggingdags.c revision 1.2 1 1.2 oster /* $NetBSD: rf_parityloggingdags.c,v 1.2 1999/01/26 02:34:00 oster Exp $ */
2 1.1 oster /*
3 1.1 oster * Copyright (c) 1995 Carnegie-Mellon University.
4 1.1 oster * All rights reserved.
5 1.1 oster *
6 1.1 oster * Author: William V. Courtright II
7 1.1 oster *
8 1.1 oster * Permission to use, copy, modify and distribute this software and
9 1.1 oster * its documentation is hereby granted, provided that both the copyright
10 1.1 oster * notice and this permission notice appear in all copies of the
11 1.1 oster * software, derivative works or modified versions, and any portions
12 1.1 oster * thereof, and that both notices appear in supporting documentation.
13 1.1 oster *
14 1.1 oster * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 1.1 oster * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 1.1 oster * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 1.1 oster *
18 1.1 oster * Carnegie Mellon requests users of this software to return to
19 1.1 oster *
20 1.1 oster * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 1.1 oster * School of Computer Science
22 1.1 oster * Carnegie Mellon University
23 1.1 oster * Pittsburgh PA 15213-3890
24 1.1 oster *
25 1.1 oster * any improvements or extensions that they make and grant Carnegie the
26 1.1 oster * rights to redistribute these changes.
27 1.1 oster */
28 1.1 oster
29 1.1 oster #include "rf_archs.h"
30 1.1 oster
31 1.1 oster #if RF_INCLUDE_PARITYLOGGING > 0
32 1.1 oster
33 1.1 oster /*
34 1.1 oster DAGs specific to parity logging are created here
35 1.1 oster */
36 1.1 oster
37 1.1 oster #include "rf_types.h"
38 1.1 oster #include "rf_raid.h"
39 1.1 oster #include "rf_dag.h"
40 1.1 oster #include "rf_dagutils.h"
41 1.1 oster #include "rf_dagfuncs.h"
42 1.1 oster #include "rf_threadid.h"
43 1.1 oster #include "rf_debugMem.h"
44 1.1 oster #include "rf_paritylog.h"
45 1.1 oster #include "rf_memchunk.h"
46 1.1 oster #include "rf_general.h"
47 1.1 oster
48 1.1 oster #include "rf_parityloggingdags.h"
49 1.1 oster
50 1.1 oster /******************************************************************************
51 1.1 oster *
52 1.1 oster * creates a DAG to perform a large-write operation:
53 1.1 oster *
54 1.1 oster * / Rod \ / Wnd \
55 1.1 oster * H -- NIL- Rod - NIL - Wnd ------ NIL - T
56 1.1 oster * \ Rod / \ Xor - Lpo /
57 1.1 oster *
58 1.1 oster * The writes are not done until the reads complete because if they were done in
59 1.1 oster * parallel, a failure on one of the reads could leave the parity in an inconsistent
60 1.1 oster * state, so that the retry with a new DAG would produce erroneous parity.
61 1.1 oster *
62 1.1 oster * Note: this DAG has the nasty property that none of the buffers allocated for reading
63 1.1 oster * old data can be freed until the XOR node fires. Need to fix this.
64 1.1 oster *
65 1.1 oster * The last two arguments are the number of faults tolerated, and function for the
66 1.1 oster * redundancy calculation. The undo for the redundancy calc is assumed to be null
67 1.1 oster *
68 1.1 oster *****************************************************************************/
69 1.1 oster
70 1.1 oster void rf_CommonCreateParityLoggingLargeWriteDAG(
71 1.1 oster RF_Raid_t *raidPtr,
72 1.1 oster RF_AccessStripeMap_t *asmap,
73 1.1 oster RF_DagHeader_t *dag_h,
74 1.1 oster void *bp,
75 1.1 oster RF_RaidAccessFlags_t flags,
76 1.1 oster RF_AllocListElem_t *allocList,
77 1.1 oster int nfaults,
78 1.1 oster int (*redFunc)(RF_DagNode_t *))
79 1.1 oster {
80 1.1 oster RF_DagNode_t *nodes, *wndNodes, *rodNodes=NULL, *syncNode, *xorNode, *lpoNode, *blockNode, *unblockNode, *termNode;
81 1.1 oster int nWndNodes, nRodNodes, i;
82 1.1 oster RF_RaidLayout_t *layoutPtr = &(raidPtr->Layout);
83 1.1 oster RF_AccessStripeMapHeader_t *new_asm_h[2];
84 1.1 oster int nodeNum, asmNum;
85 1.1 oster RF_ReconUnitNum_t which_ru;
86 1.1 oster char *sosBuffer, *eosBuffer;
87 1.1 oster RF_PhysDiskAddr_t *pda;
88 1.1 oster RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
89 1.1 oster
90 1.1 oster if (rf_dagDebug)
91 1.1 oster printf("[Creating parity-logging large-write DAG]\n");
92 1.1 oster RF_ASSERT(nfaults == 1); /* this arch only single fault tolerant */
93 1.1 oster dag_h->creator = "ParityLoggingLargeWriteDAG";
94 1.1 oster
95 1.1 oster /* alloc the Wnd nodes, the xor node, and the Lpo node */
96 1.1 oster nWndNodes = asmap->numStripeUnitsAccessed;
97 1.1 oster RF_CallocAndAdd(nodes, nWndNodes + 6, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
98 1.1 oster i = 0;
99 1.1 oster wndNodes = &nodes[i]; i += nWndNodes;
100 1.1 oster xorNode = &nodes[i]; i += 1;
101 1.1 oster lpoNode = &nodes[i]; i += 1;
102 1.1 oster blockNode = &nodes[i]; i += 1;
103 1.1 oster syncNode = &nodes[i]; i += 1;
104 1.1 oster unblockNode = &nodes[i]; i += 1;
105 1.1 oster termNode = &nodes[i]; i += 1;
106 1.1 oster
107 1.1 oster dag_h->numCommitNodes = nWndNodes + 1;
108 1.1 oster dag_h->numCommits = 0;
109 1.1 oster dag_h->numSuccedents = 1;
110 1.1 oster
111 1.1 oster rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
112 1.1 oster if (nRodNodes > 0)
113 1.1 oster RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
114 1.1 oster
115 1.1 oster /* begin node initialization */
116 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes + 1, 0, 0, 0, dag_h, "Nil", allocList);
117 1.1 oster rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nWndNodes + 1, 0, 0, dag_h, "Nil", allocList);
118 1.1 oster rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes + 1, 0, 0, dag_h, "Nil", allocList);
119 1.1 oster rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
120 1.1 oster
121 1.1 oster /* initialize the Rod nodes */
122 1.1 oster for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
123 1.1 oster if (new_asm_h[asmNum]) {
124 1.1 oster pda = new_asm_h[asmNum]->stripeMap->physInfo;
125 1.1 oster while (pda) {
126 1.1 oster rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,rf_DiskReadUndoFunc,rf_GenericWakeupFunc,1,1,4,0, dag_h, "Rod", allocList);
127 1.1 oster rodNodes[nodeNum].params[0].p = pda;
128 1.1 oster rodNodes[nodeNum].params[1].p = pda->bufPtr;
129 1.1 oster rodNodes[nodeNum].params[2].v = parityStripeID;
130 1.1 oster rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
131 1.1 oster nodeNum++;
132 1.1 oster pda=pda->next;
133 1.1 oster }
134 1.1 oster }
135 1.1 oster }
136 1.1 oster RF_ASSERT(nodeNum == nRodNodes);
137 1.1 oster
138 1.1 oster /* initialize the wnd nodes */
139 1.1 oster pda = asmap->physInfo;
140 1.1 oster for (i=0; i < nWndNodes; i++) {
141 1.1 oster rf_InitNode(&wndNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
142 1.1 oster RF_ASSERT(pda != NULL);
143 1.1 oster wndNodes[i].params[0].p = pda;
144 1.1 oster wndNodes[i].params[1].p = pda->bufPtr;
145 1.1 oster wndNodes[i].params[2].v = parityStripeID;
146 1.1 oster wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
147 1.1 oster pda = pda->next;
148 1.1 oster }
149 1.1 oster
150 1.1 oster /* initialize the redundancy node */
151 1.1 oster rf_InitNode(xorNode, rf_wait, RF_TRUE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 1, 2*(nWndNodes+nRodNodes)+1, 1, dag_h, "Xr ", allocList);
152 1.1 oster xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
153 1.1 oster for (i=0; i < nWndNodes; i++) {
154 1.1 oster xorNode->params[2*i+0] = wndNodes[i].params[0]; /* pda */
155 1.1 oster xorNode->params[2*i+1] = wndNodes[i].params[1]; /* buf ptr */
156 1.1 oster }
157 1.1 oster for (i=0; i < nRodNodes; i++) {
158 1.1 oster xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0]; /* pda */
159 1.1 oster xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1]; /* buf ptr */
160 1.1 oster }
161 1.1 oster xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */
162 1.1 oster
163 1.1 oster /* look for an Rod node that reads a complete SU. If none, alloc a buffer to receive the parity info.
164 1.1 oster * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
165 1.1 oster */
166 1.1 oster for (i = 0; i < nRodNodes; i++)
167 1.1 oster if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
168 1.1 oster break;
169 1.1 oster if (i == nRodNodes) {
170 1.1 oster RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
171 1.1 oster }
172 1.1 oster else {
173 1.1 oster xorNode->results[0] = rodNodes[i].params[1].p;
174 1.1 oster }
175 1.1 oster
176 1.1 oster /* initialize the Lpo node */
177 1.1 oster rf_InitNode(lpoNode, rf_wait, RF_FALSE, rf_ParityLogOverwriteFunc, rf_ParityLogOverwriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpo", allocList);
178 1.1 oster
179 1.1 oster lpoNode->params[0].p = asmap->parityInfo;
180 1.1 oster lpoNode->params[1].p = xorNode->results[0];
181 1.1 oster RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must describe entire parity unit */
182 1.1 oster
183 1.1 oster /* connect nodes to form graph */
184 1.1 oster
185 1.1 oster /* connect dag header to block node */
186 1.1 oster RF_ASSERT(dag_h->numSuccedents == 1);
187 1.1 oster RF_ASSERT(blockNode->numAntecedents == 0);
188 1.1 oster dag_h->succedents[0] = blockNode;
189 1.1 oster
190 1.1 oster /* connect the block node to the Rod nodes */
191 1.1 oster RF_ASSERT(blockNode->numSuccedents == nRodNodes + 1);
192 1.1 oster for (i = 0; i < nRodNodes; i++) {
193 1.1 oster RF_ASSERT(rodNodes[i].numAntecedents == 1);
194 1.1 oster blockNode->succedents[i] = &rodNodes[i];
195 1.1 oster rodNodes[i].antecedents[0] = blockNode;
196 1.1 oster rodNodes[i].antType[0] = rf_control;
197 1.1 oster }
198 1.1 oster
199 1.1 oster /* connect the block node to the sync node */
200 1.1 oster /* necessary if nRodNodes == 0 */
201 1.1 oster RF_ASSERT(syncNode->numAntecedents == nRodNodes + 1);
202 1.1 oster blockNode->succedents[nRodNodes] = syncNode;
203 1.1 oster syncNode->antecedents[0] = blockNode;
204 1.1 oster syncNode->antType[0] = rf_control;
205 1.1 oster
206 1.1 oster /* connect the Rod nodes to the syncNode */
207 1.1 oster for (i = 0; i < nRodNodes; i++) {
208 1.1 oster rodNodes[i].succedents[0] = syncNode;
209 1.1 oster syncNode->antecedents[1 + i] = &rodNodes[i];
210 1.1 oster syncNode->antType[1 + i] = rf_control;
211 1.1 oster }
212 1.1 oster
213 1.1 oster /* connect the sync node to the xor node */
214 1.1 oster RF_ASSERT(syncNode->numSuccedents == nWndNodes + 1);
215 1.1 oster RF_ASSERT(xorNode->numAntecedents == 1);
216 1.1 oster syncNode->succedents[0] = xorNode;
217 1.1 oster xorNode->antecedents[0] = syncNode;
218 1.1 oster xorNode->antType[0] = rf_trueData; /* carry forward from sync */
219 1.1 oster
220 1.1 oster /* connect the sync node to the Wnd nodes */
221 1.1 oster for (i = 0; i < nWndNodes; i++) {
222 1.1 oster RF_ASSERT(wndNodes->numAntecedents == 1);
223 1.1 oster syncNode->succedents[1 + i] = &wndNodes[i];
224 1.1 oster wndNodes[i].antecedents[0] = syncNode;
225 1.1 oster wndNodes[i].antType[0] = rf_control;
226 1.1 oster }
227 1.1 oster
228 1.1 oster /* connect the xor node to the Lpo node */
229 1.1 oster RF_ASSERT(xorNode->numSuccedents == 1);
230 1.1 oster RF_ASSERT(lpoNode->numAntecedents == 1);
231 1.1 oster xorNode->succedents[0] = lpoNode;
232 1.1 oster lpoNode->antecedents[0]= xorNode;
233 1.1 oster lpoNode->antType[0] = rf_trueData;
234 1.1 oster
235 1.1 oster /* connect the Wnd nodes to the unblock node */
236 1.1 oster RF_ASSERT(unblockNode->numAntecedents == nWndNodes + 1);
237 1.1 oster for (i = 0; i < nWndNodes; i++) {
238 1.1 oster RF_ASSERT(wndNodes->numSuccedents == 1);
239 1.1 oster wndNodes[i].succedents[0] = unblockNode;
240 1.1 oster unblockNode->antecedents[i] = &wndNodes[i];
241 1.1 oster unblockNode->antType[i] = rf_control;
242 1.1 oster }
243 1.1 oster
244 1.1 oster /* connect the Lpo node to the unblock node */
245 1.1 oster RF_ASSERT(lpoNode->numSuccedents == 1);
246 1.1 oster lpoNode->succedents[0] = unblockNode;
247 1.1 oster unblockNode->antecedents[nWndNodes] = lpoNode;
248 1.1 oster unblockNode->antType[nWndNodes] = rf_control;
249 1.1 oster
250 1.1 oster /* connect unblock node to terminator */
251 1.1 oster RF_ASSERT(unblockNode->numSuccedents == 1);
252 1.1 oster RF_ASSERT(termNode->numAntecedents == 1);
253 1.1 oster RF_ASSERT(termNode->numSuccedents == 0);
254 1.1 oster unblockNode->succedents[0] = termNode;
255 1.1 oster termNode->antecedents[0] = unblockNode;
256 1.1 oster termNode->antType[0] = rf_control;
257 1.1 oster }
258 1.1 oster
259 1.1 oster
260 1.1 oster
261 1.1 oster
262 1.1 oster /******************************************************************************
263 1.1 oster *
264 1.1 oster * creates a DAG to perform a small-write operation (either raid 5 or pq), which is as follows:
265 1.1 oster *
266 1.1 oster * Header
267 1.1 oster * |
268 1.1 oster * Block
269 1.1 oster * / | ... \ \
270 1.1 oster * / | \ \
271 1.1 oster * Rod Rod Rod Rop
272 1.1 oster * | \ /| \ / | \/ |
273 1.1 oster * | | | /\ |
274 1.1 oster * Wnd Wnd Wnd X
275 1.1 oster * | \ / |
276 1.1 oster * | \ / |
277 1.1 oster * \ \ / Lpo
278 1.1 oster * \ \ / /
279 1.1 oster * +-> Unblock <-+
280 1.1 oster * |
281 1.1 oster * T
282 1.1 oster *
283 1.1 oster *
284 1.1 oster * R = Read, W = Write, X = Xor, o = old, n = new, d = data, p = parity.
285 1.1 oster * When the access spans a stripe unit boundary and is less than one SU in size, there will
286 1.1 oster * be two Rop -- X -- Wnp branches. I call this the "double-XOR" case.
287 1.1 oster * The second output from each Rod node goes to the X node. In the double-XOR
288 1.1 oster * case, there are exactly 2 Rod nodes, and each sends one output to one X node.
289 1.1 oster * There is one Rod -- Wnd -- T branch for each stripe unit being updated.
290 1.1 oster *
291 1.1 oster * The block and unblock nodes are unused. See comment above CreateFaultFreeReadDAG.
292 1.1 oster *
293 1.1 oster * Note: this DAG ignores all the optimizations related to making the RMWs atomic.
294 1.1 oster * it also has the nasty property that none of the buffers allocated for reading
295 1.1 oster * old data & parity can be freed until the XOR node fires. Need to fix this.
296 1.1 oster *
297 1.1 oster * A null qfuncs indicates single fault tolerant
298 1.1 oster *****************************************************************************/
299 1.1 oster
300 1.1 oster void rf_CommonCreateParityLoggingSmallWriteDAG(
301 1.1 oster RF_Raid_t *raidPtr,
302 1.1 oster RF_AccessStripeMap_t *asmap,
303 1.1 oster RF_DagHeader_t *dag_h,
304 1.1 oster void *bp,
305 1.1 oster RF_RaidAccessFlags_t flags,
306 1.1 oster RF_AllocListElem_t *allocList,
307 1.1 oster RF_RedFuncs_t *pfuncs,
308 1.1 oster RF_RedFuncs_t *qfuncs)
309 1.1 oster {
310 1.1 oster RF_DagNode_t *xorNodes, *blockNode, *unblockNode, *nodes;
311 1.1 oster RF_DagNode_t *readDataNodes, *readParityNodes;
312 1.1 oster RF_DagNode_t *writeDataNodes, *lpuNodes;
313 1.1 oster RF_DagNode_t *unlockDataNodes=NULL, *termNode;
314 1.1 oster RF_PhysDiskAddr_t *pda = asmap->physInfo;
315 1.1 oster int numDataNodes = asmap->numStripeUnitsAccessed;
316 1.1 oster int numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
317 1.1 oster int i, j, nNodes, totalNumNodes;
318 1.1 oster RF_ReconUnitNum_t which_ru;
319 1.1 oster int (*func)(RF_DagNode_t *node), (*undoFunc)(RF_DagNode_t *node);
320 1.1 oster int (*qfunc)(RF_DagNode_t *node);
321 1.1 oster char *name, *qname;
322 1.1 oster RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
323 1.1 oster long nfaults = qfuncs ? 2 : 1;
324 1.1 oster int lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
325 1.1 oster
326 1.1 oster if (rf_dagDebug) printf("[Creating parity-logging small-write DAG]\n");
327 1.1 oster RF_ASSERT(numDataNodes > 0);
328 1.1 oster RF_ASSERT(nfaults == 1);
329 1.1 oster dag_h->creator = "ParityLoggingSmallWriteDAG";
330 1.1 oster
331 1.1 oster /* DAG creation occurs in three steps:
332 1.1 oster 1. count the number of nodes in the DAG
333 1.1 oster 2. create the nodes
334 1.1 oster 3. initialize the nodes
335 1.1 oster 4. connect the nodes
336 1.1 oster */
337 1.1 oster
338 1.1 oster /* Step 1. compute number of nodes in the graph */
339 1.1 oster
340 1.1 oster /* number of nodes:
341 1.1 oster a read and write for each data unit
342 1.1 oster a redundancy computation node for each parity node
343 1.1 oster a read and Lpu for each parity unit
344 1.1 oster a block and unblock node (2)
345 1.1 oster a terminator node
346 1.1 oster if atomic RMW
347 1.1 oster an unlock node for each data unit, redundancy unit
348 1.1 oster */
349 1.1 oster totalNumNodes = (2 * numDataNodes) + numParityNodes + (2 * numParityNodes) + 3;
350 1.1 oster if (lu_flag)
351 1.1 oster totalNumNodes += numDataNodes;
352 1.1 oster
353 1.1 oster nNodes = numDataNodes + numParityNodes;
354 1.1 oster
355 1.1 oster dag_h->numCommitNodes = numDataNodes + numParityNodes;
356 1.1 oster dag_h->numCommits = 0;
357 1.1 oster dag_h->numSuccedents = 1;
358 1.1 oster
359 1.1 oster /* Step 2. create the nodes */
360 1.1 oster RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
361 1.1 oster i = 0;
362 1.1 oster blockNode = &nodes[i]; i += 1;
363 1.1 oster unblockNode = &nodes[i]; i += 1;
364 1.1 oster readDataNodes = &nodes[i]; i += numDataNodes;
365 1.1 oster readParityNodes = &nodes[i]; i += numParityNodes;
366 1.1 oster writeDataNodes = &nodes[i]; i += numDataNodes;
367 1.1 oster lpuNodes = &nodes[i]; i += numParityNodes;
368 1.1 oster xorNodes = &nodes[i]; i += numParityNodes;
369 1.1 oster termNode = &nodes[i]; i += 1;
370 1.1 oster if (lu_flag) {
371 1.1 oster unlockDataNodes = &nodes[i]; i += numDataNodes;
372 1.1 oster }
373 1.1 oster RF_ASSERT(i == totalNumNodes);
374 1.1 oster
375 1.1 oster /* Step 3. initialize the nodes */
376 1.1 oster /* initialize block node (Nil) */
377 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
378 1.1 oster
379 1.1 oster /* initialize unblock node (Nil) */
380 1.1 oster rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nNodes, 0, 0, dag_h, "Nil", allocList);
381 1.1 oster
382 1.1 oster /* initialize terminatory node (Trm) */
383 1.1 oster rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
384 1.1 oster
385 1.1 oster /* initialize nodes which read old data (Rod) */
386 1.1 oster for (i = 0; i < numDataNodes; i++) {
387 1.1 oster rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rod", allocList);
388 1.1 oster RF_ASSERT(pda != NULL);
389 1.1 oster readDataNodes[i].params[0].p = pda; /* physical disk addr desc */
390 1.1 oster readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old data */
391 1.1 oster readDataNodes[i].params[2].v = parityStripeID;
392 1.1 oster readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
393 1.1 oster pda=pda->next;
394 1.1 oster readDataNodes[i].propList[0] = NULL;
395 1.1 oster readDataNodes[i].propList[1] = NULL;
396 1.1 oster }
397 1.1 oster
398 1.1 oster /* initialize nodes which read old parity (Rop) */
399 1.1 oster pda = asmap->parityInfo; i = 0;
400 1.1 oster for (i = 0; i < numParityNodes; i++) {
401 1.1 oster RF_ASSERT(pda != NULL);
402 1.1 oster rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rop", allocList);
403 1.1 oster readParityNodes[i].params[0].p = pda;
404 1.1 oster readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old parity */
405 1.1 oster readParityNodes[i].params[2].v = parityStripeID;
406 1.1 oster readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
407 1.1 oster readParityNodes[i].propList[0] = NULL;
408 1.1 oster pda=pda->next;
409 1.1 oster }
410 1.1 oster
411 1.1 oster /* initialize nodes which write new data (Wnd) */
412 1.1 oster pda = asmap->physInfo;
413 1.1 oster for (i=0; i < numDataNodes; i++) {
414 1.1 oster RF_ASSERT(pda != NULL);
415 1.1 oster rf_InitNode(&writeDataNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, nNodes, 4, 0, dag_h, "Wnd", allocList);
416 1.1 oster writeDataNodes[i].params[0].p = pda; /* physical disk addr desc */
417 1.1 oster writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new data to be written */
418 1.1 oster writeDataNodes[i].params[2].v = parityStripeID;
419 1.1 oster writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
420 1.1 oster
421 1.1 oster if (lu_flag) {
422 1.1 oster /* initialize node to unlock the disk queue */
423 1.1 oster rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
424 1.1 oster unlockDataNodes[i].params[0].p = pda; /* physical disk addr desc */
425 1.1 oster unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
426 1.1 oster }
427 1.1 oster pda = pda->next;
428 1.1 oster }
429 1.1 oster
430 1.1 oster
431 1.1 oster /* initialize nodes which compute new parity */
432 1.1 oster /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
433 1.1 oster * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
434 1.1 oster * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
435 1.1 oster * the simple XOR func just XORs the data into the start of the buffer.
436 1.1 oster */
437 1.1 oster if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
438 1.1 oster func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
439 1.1 oster if (qfuncs)
440 1.1 oster { qfunc = qfuncs->simple; qname = qfuncs->SimpleName;}
441 1.1 oster } else {
442 1.1 oster func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
443 1.1 oster if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
444 1.1 oster }
445 1.1 oster /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr */
446 1.1 oster if (numParityNodes==2) { /* double-xor case */
447 1.1 oster for (i=0; i < numParityNodes; i++) {
448 1.1 oster rf_InitNode(&xorNodes[i], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for xor */
449 1.1 oster xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
450 1.1 oster xorNodes[i].params[0] = readDataNodes[i].params[0];
451 1.1 oster xorNodes[i].params[1] = readDataNodes[i].params[1];
452 1.1 oster xorNodes[i].params[2] = readParityNodes[i].params[0];
453 1.1 oster xorNodes[i].params[3] = readParityNodes[i].params[1];
454 1.1 oster xorNodes[i].params[4] = writeDataNodes[i].params[0];
455 1.1 oster xorNodes[i].params[5] = writeDataNodes[i].params[1];
456 1.1 oster xorNodes[i].params[6].p = raidPtr;
457 1.1 oster xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as target buf */
458 1.1 oster }
459 1.1 oster }
460 1.1 oster else {
461 1.1 oster /* there is only one xor node in this case */
462 1.1 oster rf_InitNode(&xorNodes[0], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
463 1.1 oster xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
464 1.1 oster for (i=0; i < numDataNodes + 1; i++) {
465 1.1 oster /* set up params related to Rod and Rop nodes */
466 1.1 oster xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
467 1.1 oster xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer pointer */
468 1.1 oster }
469 1.1 oster for (i=0; i < numDataNodes; i++) {
470 1.1 oster /* set up params related to Wnd and Wnp nodes */
471 1.1 oster xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
472 1.1 oster xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
473 1.1 oster }
474 1.1 oster xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr; /* xor node needs to get at RAID information */
475 1.1 oster xorNodes[0].results[0] = readParityNodes[0].params[1].p;
476 1.1 oster }
477 1.1 oster
478 1.1 oster /* initialize the log node(s) */
479 1.1 oster pda = asmap->parityInfo;
480 1.1 oster for (i = 0; i < numParityNodes; i++) {
481 1.1 oster RF_ASSERT(pda);
482 1.1 oster rf_InitNode(&lpuNodes[i], rf_wait, RF_FALSE, rf_ParityLogUpdateFunc, rf_ParityLogUpdateUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpu", allocList);
483 1.1 oster lpuNodes[i].params[0].p = pda; /* PhysDiskAddr of parity */
484 1.1 oster lpuNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer to parity */
485 1.1 oster pda = pda->next;
486 1.1 oster }
487 1.1 oster
488 1.1 oster
489 1.1 oster /* Step 4. connect the nodes */
490 1.1 oster
491 1.1 oster /* connect header to block node */
492 1.1 oster RF_ASSERT(dag_h->numSuccedents == 1);
493 1.1 oster RF_ASSERT(blockNode->numAntecedents == 0);
494 1.1 oster dag_h->succedents[0] = blockNode;
495 1.1 oster
496 1.1 oster /* connect block node to read old data nodes */
497 1.1 oster RF_ASSERT(blockNode->numSuccedents == (numDataNodes + numParityNodes));
498 1.1 oster for (i = 0; i < numDataNodes; i++) {
499 1.1 oster blockNode->succedents[i] = &readDataNodes[i];
500 1.1 oster RF_ASSERT(readDataNodes[i].numAntecedents == 1);
501 1.1 oster readDataNodes[i].antecedents[0]= blockNode;
502 1.1 oster readDataNodes[i].antType[0] = rf_control;
503 1.1 oster }
504 1.1 oster
505 1.1 oster /* connect block node to read old parity nodes */
506 1.1 oster for (i = 0; i < numParityNodes; i++) {
507 1.1 oster blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
508 1.1 oster RF_ASSERT(readParityNodes[i].numAntecedents == 1);
509 1.1 oster readParityNodes[i].antecedents[0] = blockNode;
510 1.1 oster readParityNodes[i].antType[0] = rf_control;
511 1.1 oster }
512 1.1 oster
513 1.1 oster /* connect read old data nodes to write new data nodes */
514 1.1 oster for (i = 0; i < numDataNodes; i++) {
515 1.1 oster RF_ASSERT(readDataNodes[i].numSuccedents == numDataNodes + numParityNodes);
516 1.1 oster for (j = 0; j < numDataNodes; j++) {
517 1.1 oster RF_ASSERT(writeDataNodes[j].numAntecedents == numDataNodes + numParityNodes);
518 1.1 oster readDataNodes[i].succedents[j] = &writeDataNodes[j];
519 1.1 oster writeDataNodes[j].antecedents[i] = &readDataNodes[i];
520 1.1 oster if (i == j)
521 1.1 oster writeDataNodes[j].antType[i] = rf_antiData;
522 1.1 oster else
523 1.1 oster writeDataNodes[j].antType[i] = rf_control;
524 1.1 oster }
525 1.1 oster }
526 1.1 oster
527 1.1 oster /* connect read old data nodes to xor nodes */
528 1.1 oster for (i = 0; i < numDataNodes; i++)
529 1.1 oster for (j = 0; j < numParityNodes; j++){
530 1.1 oster RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
531 1.1 oster readDataNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
532 1.1 oster xorNodes[j].antecedents[i] = &readDataNodes[i];
533 1.1 oster xorNodes[j].antType[i] = rf_trueData;
534 1.1 oster }
535 1.1 oster
536 1.1 oster /* connect read old parity nodes to write new data nodes */
537 1.1 oster for (i = 0; i < numParityNodes; i++) {
538 1.1 oster RF_ASSERT(readParityNodes[i].numSuccedents == numDataNodes + numParityNodes);
539 1.1 oster for (j = 0; j < numDataNodes; j++) {
540 1.1 oster readParityNodes[i].succedents[j] = &writeDataNodes[j];
541 1.1 oster writeDataNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
542 1.1 oster writeDataNodes[j].antType[numDataNodes + i] = rf_control;
543 1.1 oster }
544 1.1 oster }
545 1.1 oster
546 1.1 oster /* connect read old parity nodes to xor nodes */
547 1.1 oster for (i = 0; i < numParityNodes; i++)
548 1.1 oster for (j = 0; j < numParityNodes; j++) {
549 1.1 oster readParityNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
550 1.1 oster xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
551 1.1 oster xorNodes[j].antType[numDataNodes + i] = rf_trueData;
552 1.1 oster }
553 1.1 oster
554 1.1 oster /* connect xor nodes to write new parity nodes */
555 1.1 oster for (i = 0; i < numParityNodes; i++) {
556 1.1 oster RF_ASSERT(xorNodes[i].numSuccedents == 1);
557 1.1 oster RF_ASSERT(lpuNodes[i].numAntecedents == 1);
558 1.1 oster xorNodes[i].succedents[0] = &lpuNodes[i];
559 1.1 oster lpuNodes[i].antecedents[0] = &xorNodes[i];
560 1.1 oster lpuNodes[i].antType[0] = rf_trueData;
561 1.1 oster }
562 1.1 oster
563 1.1 oster for (i = 0; i < numDataNodes; i++) {
564 1.1 oster if (lu_flag) {
565 1.1 oster /* connect write new data nodes to unlock nodes */
566 1.1 oster RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
567 1.1 oster RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
568 1.1 oster writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
569 1.1 oster unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
570 1.1 oster unlockDataNodes[i].antType[0] = rf_control;
571 1.1 oster
572 1.1 oster /* connect unlock nodes to unblock node */
573 1.1 oster RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
574 1.1 oster RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
575 1.1 oster unlockDataNodes[i].succedents[0] = unblockNode;
576 1.1 oster unblockNode->antecedents[i] = &unlockDataNodes[i];
577 1.1 oster unblockNode->antType[i] = rf_control;
578 1.1 oster }
579 1.1 oster else {
580 1.1 oster /* connect write new data nodes to unblock node */
581 1.1 oster RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
582 1.1 oster RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
583 1.1 oster writeDataNodes[i].succedents[0] = unblockNode;
584 1.1 oster unblockNode->antecedents[i] = &writeDataNodes[i];
585 1.1 oster unblockNode->antType[i] = rf_control;
586 1.1 oster }
587 1.1 oster }
588 1.1 oster
589 1.1 oster /* connect write new parity nodes to unblock node */
590 1.1 oster for (i = 0; i < numParityNodes; i++) {
591 1.1 oster RF_ASSERT(lpuNodes[i].numSuccedents == 1);
592 1.1 oster lpuNodes[i].succedents[0] = unblockNode;
593 1.1 oster unblockNode->antecedents[numDataNodes + i] = &lpuNodes[i];
594 1.1 oster unblockNode->antType[numDataNodes + i] = rf_control;
595 1.1 oster }
596 1.1 oster
597 1.1 oster /* connect unblock node to terminator */
598 1.1 oster RF_ASSERT(unblockNode->numSuccedents == 1);
599 1.1 oster RF_ASSERT(termNode->numAntecedents == 1);
600 1.1 oster RF_ASSERT(termNode->numSuccedents == 0);
601 1.1 oster unblockNode->succedents[0] = termNode;
602 1.1 oster termNode->antecedents[0] = unblockNode;
603 1.1 oster termNode->antType[0] = rf_control;
604 1.1 oster }
605 1.1 oster
606 1.1 oster
607 1.1 oster void rf_CreateParityLoggingSmallWriteDAG(
608 1.1 oster RF_Raid_t *raidPtr,
609 1.1 oster RF_AccessStripeMap_t *asmap,
610 1.1 oster RF_DagHeader_t *dag_h,
611 1.1 oster void *bp,
612 1.1 oster RF_RaidAccessFlags_t flags,
613 1.1 oster RF_AllocListElem_t *allocList,
614 1.1 oster RF_RedFuncs_t *pfuncs,
615 1.1 oster RF_RedFuncs_t *qfuncs)
616 1.1 oster {
617 1.1 oster dag_h->creator = "ParityLoggingSmallWriteDAG";
618 1.1 oster rf_CommonCreateParityLoggingSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, &rf_xorFuncs, NULL);
619 1.1 oster }
620 1.1 oster
621 1.1 oster
622 1.1 oster void rf_CreateParityLoggingLargeWriteDAG(
623 1.1 oster RF_Raid_t *raidPtr,
624 1.1 oster RF_AccessStripeMap_t *asmap,
625 1.1 oster RF_DagHeader_t *dag_h,
626 1.1 oster void *bp,
627 1.1 oster RF_RaidAccessFlags_t flags,
628 1.1 oster RF_AllocListElem_t *allocList,
629 1.1 oster int nfaults,
630 1.1 oster int (*redFunc)(RF_DagNode_t *))
631 1.1 oster {
632 1.1 oster dag_h->creator = "ParityLoggingSmallWriteDAG";
633 1.1 oster rf_CommonCreateParityLoggingLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 1, rf_RegularXorFunc);
634 1.1 oster }
635 1.1 oster
636 1.1 oster #endif /* RF_INCLUDE_PARITYLOGGING > 0 */
637