rf_dagffwr.c revision 1.1 1 1.1 oster /* $NetBSD: rf_dagffwr.c,v 1.1 1998/11/13 04:20:27 oster Exp $ */
2 1.1 oster /*
3 1.1 oster * Copyright (c) 1995 Carnegie-Mellon University.
4 1.1 oster * All rights reserved.
5 1.1 oster *
6 1.1 oster * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 1.1 oster *
8 1.1 oster * Permission to use, copy, modify and distribute this software and
9 1.1 oster * its documentation is hereby granted, provided that both the copyright
10 1.1 oster * notice and this permission notice appear in all copies of the
11 1.1 oster * software, derivative works or modified versions, and any portions
12 1.1 oster * thereof, and that both notices appear in supporting documentation.
13 1.1 oster *
14 1.1 oster * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 1.1 oster * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 1.1 oster * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 1.1 oster *
18 1.1 oster * Carnegie Mellon requests users of this software to return to
19 1.1 oster *
20 1.1 oster * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 1.1 oster * School of Computer Science
22 1.1 oster * Carnegie Mellon University
23 1.1 oster * Pittsburgh PA 15213-3890
24 1.1 oster *
25 1.1 oster * any improvements or extensions that they make and grant Carnegie the
26 1.1 oster * rights to redistribute these changes.
27 1.1 oster */
28 1.1 oster
29 1.1 oster /*
30 1.1 oster * rf_dagff.c
31 1.1 oster *
32 1.1 oster * code for creating fault-free DAGs
33 1.1 oster *
34 1.1 oster * :
35 1.1 oster * Log: rf_dagffwr.c,v
36 1.1 oster * Revision 1.19 1996/07/31 15:35:24 jimz
37 1.1 oster * evenodd changes; bugfixes for double-degraded archs, generalize
38 1.1 oster * some formerly PQ-only functions
39 1.1 oster *
40 1.1 oster * Revision 1.18 1996/07/28 20:31:39 jimz
41 1.1 oster * i386netbsd port
42 1.1 oster * true/false fixup
43 1.1 oster *
44 1.1 oster * Revision 1.17 1996/07/27 18:40:24 jimz
45 1.1 oster * cleanup sweep
46 1.1 oster *
47 1.1 oster * Revision 1.16 1996/07/22 19:52:16 jimz
48 1.1 oster * switched node params to RF_DagParam_t, a union of
49 1.1 oster * a 64-bit int and a void *, for better portability
50 1.1 oster * attempted hpux port, but failed partway through for
51 1.1 oster * lack of a single C compiler capable of compiling all
52 1.1 oster * source files
53 1.1 oster *
54 1.1 oster * Revision 1.15 1996/06/11 01:27:50 jimz
55 1.1 oster * Fixed bug where diskthread shutdown would crash or hang. This
56 1.1 oster * turned out to be two distinct bugs:
57 1.1 oster * (1) [crash] The thread shutdown code wasn't properly waiting for
58 1.1 oster * all the diskthreads to complete. This caused diskthreads that were
59 1.1 oster * exiting+cleaning up to unlock a destroyed mutex.
60 1.1 oster * (2) [hang] TerminateDiskQueues wasn't locking, and DiskIODequeue
61 1.1 oster * only checked for termination _after_ a wakeup if the queues were
62 1.1 oster * empty. This was a race where the termination wakeup could be lost
63 1.1 oster * by the dequeueing thread, and the system would hang waiting for the
64 1.1 oster * thread to exit, while the thread waited for an I/O or a signal to
65 1.1 oster * check the termination flag.
66 1.1 oster *
67 1.1 oster * Revision 1.14 1996/06/10 22:24:01 wvcii
68 1.1 oster * added write dags which do not have a commit node and are
69 1.1 oster * used in forward and backward error recovery experiments.
70 1.1 oster *
71 1.1 oster * Revision 1.13 1996/06/07 22:26:27 jimz
72 1.1 oster * type-ify which_ru (RF_ReconUnitNum_t)
73 1.1 oster *
74 1.1 oster * Revision 1.12 1996/06/07 21:33:04 jimz
75 1.1 oster * begin using consistent types for sector numbers,
76 1.1 oster * stripe numbers, row+col numbers, recon unit numbers
77 1.1 oster *
78 1.1 oster * Revision 1.11 1996/05/31 22:26:54 jimz
79 1.1 oster * fix a lot of mapping problems, memory allocation problems
80 1.1 oster * found some weird lock issues, fixed 'em
81 1.1 oster * more code cleanup
82 1.1 oster *
83 1.1 oster * Revision 1.10 1996/05/30 11:29:41 jimz
84 1.1 oster * Numerous bug fixes. Stripe lock release code disagreed with the taking code
85 1.1 oster * about when stripes should be locked (I made it consistent: no parity, no lock)
86 1.1 oster * There was a lot of extra serialization of I/Os which I've removed- a lot of
87 1.1 oster * it was to calculate values for the cache code, which is no longer with us.
88 1.1 oster * More types, function, macro cleanup. Added code to properly quiesce the array
89 1.1 oster * on shutdown. Made a lot of stuff array-specific which was (bogusly) general
90 1.1 oster * before. Fixed memory allocation, freeing bugs.
91 1.1 oster *
92 1.1 oster * Revision 1.9 1996/05/27 18:56:37 jimz
93 1.1 oster * more code cleanup
94 1.1 oster * better typing
95 1.1 oster * compiles in all 3 environments
96 1.1 oster *
97 1.1 oster * Revision 1.8 1996/05/24 22:17:04 jimz
98 1.1 oster * continue code + namespace cleanup
99 1.1 oster * typed a bunch of flags
100 1.1 oster *
101 1.1 oster * Revision 1.7 1996/05/24 04:28:55 jimz
102 1.1 oster * release cleanup ckpt
103 1.1 oster *
104 1.1 oster * Revision 1.6 1996/05/23 21:46:35 jimz
105 1.1 oster * checkpoint in code cleanup (release prep)
106 1.1 oster * lots of types, function names have been fixed
107 1.1 oster *
108 1.1 oster * Revision 1.5 1996/05/23 00:33:23 jimz
109 1.1 oster * code cleanup: move all debug decls to rf_options.c, all extern
110 1.1 oster * debug decls to rf_options.h, all debug vars preceded by rf_
111 1.1 oster *
112 1.1 oster * Revision 1.4 1996/05/18 19:51:34 jimz
113 1.1 oster * major code cleanup- fix syntax, make some types consistent,
114 1.1 oster * add prototypes, clean out dead code, et cetera
115 1.1 oster *
116 1.1 oster * Revision 1.3 1996/05/15 23:23:12 wvcii
117 1.1 oster * fixed bug in small write read old q node succedent initialization
118 1.1 oster *
119 1.1 oster * Revision 1.2 1996/05/08 21:01:24 jimz
120 1.1 oster * fixed up enum type names that were conflicting with other
121 1.1 oster * enums and function names (ie, "panic")
122 1.1 oster * future naming trends will be towards RF_ and rf_ for
123 1.1 oster * everything raidframe-related
124 1.1 oster *
125 1.1 oster * Revision 1.1 1996/05/03 19:20:45 wvcii
126 1.1 oster * Initial revision
127 1.1 oster *
128 1.1 oster */
129 1.1 oster
130 1.1 oster #include "rf_types.h"
131 1.1 oster #include "rf_raid.h"
132 1.1 oster #include "rf_dag.h"
133 1.1 oster #include "rf_dagutils.h"
134 1.1 oster #include "rf_dagfuncs.h"
135 1.1 oster #include "rf_threadid.h"
136 1.1 oster #include "rf_debugMem.h"
137 1.1 oster #include "rf_dagffrd.h"
138 1.1 oster #include "rf_memchunk.h"
139 1.1 oster #include "rf_general.h"
140 1.1 oster #include "rf_dagffwr.h"
141 1.1 oster
142 1.1 oster /******************************************************************************
143 1.1 oster *
144 1.1 oster * General comments on DAG creation:
145 1.1 oster *
146 1.1 oster * All DAGs in this file use roll-away error recovery. Each DAG has a single
147 1.1 oster * commit node, usually called "Cmt." If an error occurs before the Cmt node
148 1.1 oster * is reached, the execution engine will halt forward execution and work
149 1.1 oster * backward through the graph, executing the undo functions. Assuming that
150 1.1 oster * each node in the graph prior to the Cmt node are undoable and atomic - or -
151 1.1 oster * does not make changes to permanent state, the graph will fail atomically.
152 1.1 oster * If an error occurs after the Cmt node executes, the engine will roll-forward
153 1.1 oster * through the graph, blindly executing nodes until it reaches the end.
154 1.1 oster * If a graph reaches the end, it is assumed to have completed successfully.
155 1.1 oster *
156 1.1 oster * A graph has only 1 Cmt node.
157 1.1 oster *
158 1.1 oster */
159 1.1 oster
160 1.1 oster
161 1.1 oster /******************************************************************************
162 1.1 oster *
163 1.1 oster * The following wrappers map the standard DAG creation interface to the
164 1.1 oster * DAG creation routines. Additionally, these wrappers enable experimentation
165 1.1 oster * with new DAG structures by providing an extra level of indirection, allowing
166 1.1 oster * the DAG creation routines to be replaced at this single point.
167 1.1 oster */
168 1.1 oster
169 1.1 oster
170 1.1 oster void rf_CreateNonRedundantWriteDAG(
171 1.1 oster RF_Raid_t *raidPtr,
172 1.1 oster RF_AccessStripeMap_t *asmap,
173 1.1 oster RF_DagHeader_t *dag_h,
174 1.1 oster void *bp,
175 1.1 oster RF_RaidAccessFlags_t flags,
176 1.1 oster RF_AllocListElem_t *allocList,
177 1.1 oster RF_IoType_t type)
178 1.1 oster {
179 1.1 oster rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
180 1.1 oster RF_IO_TYPE_WRITE);
181 1.1 oster }
182 1.1 oster
183 1.1 oster void rf_CreateRAID0WriteDAG(
184 1.1 oster RF_Raid_t *raidPtr,
185 1.1 oster RF_AccessStripeMap_t *asmap,
186 1.1 oster RF_DagHeader_t *dag_h,
187 1.1 oster void *bp,
188 1.1 oster RF_RaidAccessFlags_t flags,
189 1.1 oster RF_AllocListElem_t *allocList,
190 1.1 oster RF_IoType_t type)
191 1.1 oster {
192 1.1 oster rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
193 1.1 oster RF_IO_TYPE_WRITE);
194 1.1 oster }
195 1.1 oster
196 1.1 oster void rf_CreateSmallWriteDAG(
197 1.1 oster RF_Raid_t *raidPtr,
198 1.1 oster RF_AccessStripeMap_t *asmap,
199 1.1 oster RF_DagHeader_t *dag_h,
200 1.1 oster void *bp,
201 1.1 oster RF_RaidAccessFlags_t flags,
202 1.1 oster RF_AllocListElem_t *allocList)
203 1.1 oster {
204 1.1 oster #if RF_FORWARD > 0
205 1.1 oster rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
206 1.1 oster &rf_xorFuncs, NULL);
207 1.1 oster #else /* RF_FORWARD > 0 */
208 1.1 oster #if RF_BACKWARD > 0
209 1.1 oster rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
210 1.1 oster &rf_xorFuncs, NULL);
211 1.1 oster #else /* RF_BACKWARD > 0 */
212 1.1 oster /* "normal" rollaway */
213 1.1 oster rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
214 1.1 oster &rf_xorFuncs, NULL);
215 1.1 oster #endif /* RF_BACKWARD > 0 */
216 1.1 oster #endif /* RF_FORWARD > 0 */
217 1.1 oster }
218 1.1 oster
219 1.1 oster void rf_CreateLargeWriteDAG(
220 1.1 oster RF_Raid_t *raidPtr,
221 1.1 oster RF_AccessStripeMap_t *asmap,
222 1.1 oster RF_DagHeader_t *dag_h,
223 1.1 oster void *bp,
224 1.1 oster RF_RaidAccessFlags_t flags,
225 1.1 oster RF_AllocListElem_t *allocList)
226 1.1 oster {
227 1.1 oster #if RF_FORWARD > 0
228 1.1 oster rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
229 1.1 oster 1, rf_RegularXorFunc, RF_TRUE);
230 1.1 oster #else /* RF_FORWARD > 0 */
231 1.1 oster #if RF_BACKWARD > 0
232 1.1 oster rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
233 1.1 oster 1, rf_RegularXorFunc, RF_TRUE);
234 1.1 oster #else /* RF_BACKWARD > 0 */
235 1.1 oster /* "normal" rollaway */
236 1.1 oster rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
237 1.1 oster 1, rf_RegularXorFunc, RF_TRUE);
238 1.1 oster #endif /* RF_BACKWARD > 0 */
239 1.1 oster #endif /* RF_FORWARD > 0 */
240 1.1 oster }
241 1.1 oster
242 1.1 oster
243 1.1 oster /******************************************************************************
244 1.1 oster *
245 1.1 oster * DAG creation code begins here
246 1.1 oster */
247 1.1 oster
248 1.1 oster
249 1.1 oster /******************************************************************************
250 1.1 oster *
251 1.1 oster * creates a DAG to perform a large-write operation:
252 1.1 oster *
253 1.1 oster * / Rod \ / Wnd \
254 1.1 oster * H -- block- Rod - Xor - Cmt - Wnd --- T
255 1.1 oster * \ Rod / \ Wnp /
256 1.1 oster * \[Wnq]/
257 1.1 oster *
258 1.1 oster * The XOR node also does the Q calculation in the P+Q architecture.
259 1.1 oster * All nodes are before the commit node (Cmt) are assumed to be atomic and
260 1.1 oster * undoable - or - they make no changes to permanent state.
261 1.1 oster *
262 1.1 oster * Rod = read old data
263 1.1 oster * Cmt = commit node
264 1.1 oster * Wnp = write new parity
265 1.1 oster * Wnd = write new data
266 1.1 oster * Wnq = write new "q"
267 1.1 oster * [] denotes optional segments in the graph
268 1.1 oster *
269 1.1 oster * Parameters: raidPtr - description of the physical array
270 1.1 oster * asmap - logical & physical addresses for this access
271 1.1 oster * bp - buffer ptr (holds write data)
272 1.1 oster * flags - general flags (e.g. disk locking)
273 1.1 oster * allocList - list of memory allocated in DAG creation
274 1.1 oster * nfaults - number of faults array can tolerate
275 1.1 oster * (equal to # redundancy units in stripe)
276 1.1 oster * redfuncs - list of redundancy generating functions
277 1.1 oster *
278 1.1 oster *****************************************************************************/
279 1.1 oster
280 1.1 oster void rf_CommonCreateLargeWriteDAG(
281 1.1 oster RF_Raid_t *raidPtr,
282 1.1 oster RF_AccessStripeMap_t *asmap,
283 1.1 oster RF_DagHeader_t *dag_h,
284 1.1 oster void *bp,
285 1.1 oster RF_RaidAccessFlags_t flags,
286 1.1 oster RF_AllocListElem_t *allocList,
287 1.1 oster int nfaults,
288 1.1 oster int (*redFunc)(RF_DagNode_t *),
289 1.1 oster int allowBufferRecycle)
290 1.1 oster {
291 1.1 oster RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
292 1.1 oster RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
293 1.1 oster int nWndNodes, nRodNodes, i, nodeNum, asmNum;
294 1.1 oster RF_AccessStripeMapHeader_t *new_asm_h[2];
295 1.1 oster RF_StripeNum_t parityStripeID;
296 1.1 oster char *sosBuffer, *eosBuffer;
297 1.1 oster RF_ReconUnitNum_t which_ru;
298 1.1 oster RF_RaidLayout_t *layoutPtr;
299 1.1 oster RF_PhysDiskAddr_t *pda;
300 1.1 oster
301 1.1 oster layoutPtr = &(raidPtr->Layout);
302 1.1 oster parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
303 1.1 oster &which_ru);
304 1.1 oster
305 1.1 oster if (rf_dagDebug) {
306 1.1 oster printf("[Creating large-write DAG]\n");
307 1.1 oster }
308 1.1 oster dag_h->creator = "LargeWriteDAG";
309 1.1 oster
310 1.1 oster dag_h->numCommitNodes = 1;
311 1.1 oster dag_h->numCommits = 0;
312 1.1 oster dag_h->numSuccedents = 1;
313 1.1 oster
314 1.1 oster /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
315 1.1 oster nWndNodes = asmap->numStripeUnitsAccessed;
316 1.1 oster RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
317 1.1 oster (RF_DagNode_t *), allocList);
318 1.1 oster i = 0;
319 1.1 oster wndNodes = &nodes[i]; i += nWndNodes;
320 1.1 oster xorNode = &nodes[i]; i += 1;
321 1.1 oster wnpNode = &nodes[i]; i += 1;
322 1.1 oster blockNode = &nodes[i]; i += 1;
323 1.1 oster commitNode = &nodes[i]; i += 1;
324 1.1 oster termNode = &nodes[i]; i += 1;
325 1.1 oster if (nfaults == 2) {
326 1.1 oster wnqNode = &nodes[i]; i += 1;
327 1.1 oster }
328 1.1 oster else {
329 1.1 oster wnqNode = NULL;
330 1.1 oster }
331 1.1 oster rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
332 1.1 oster &nRodNodes, &sosBuffer, &eosBuffer, allocList);
333 1.1 oster if (nRodNodes > 0) {
334 1.1 oster RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
335 1.1 oster (RF_DagNode_t *), allocList);
336 1.1 oster }
337 1.1 oster else {
338 1.1 oster rodNodes = NULL;
339 1.1 oster }
340 1.1 oster
341 1.1 oster /* begin node initialization */
342 1.1 oster if (nRodNodes > 0) {
343 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
344 1.1 oster NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
345 1.1 oster }
346 1.1 oster else {
347 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
348 1.1 oster NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
349 1.1 oster }
350 1.1 oster
351 1.1 oster rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
352 1.1 oster nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
353 1.1 oster rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
354 1.1 oster 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
355 1.1 oster
356 1.1 oster /* initialize the Rod nodes */
357 1.1 oster for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
358 1.1 oster if (new_asm_h[asmNum]) {
359 1.1 oster pda = new_asm_h[asmNum]->stripeMap->physInfo;
360 1.1 oster while (pda) {
361 1.1 oster rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
362 1.1 oster rf_DiskReadUndoFunc,rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
363 1.1 oster "Rod", allocList);
364 1.1 oster rodNodes[nodeNum].params[0].p = pda;
365 1.1 oster rodNodes[nodeNum].params[1].p = pda->bufPtr;
366 1.1 oster rodNodes[nodeNum].params[2].v = parityStripeID;
367 1.1 oster rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
368 1.1 oster 0, 0, which_ru);
369 1.1 oster nodeNum++;
370 1.1 oster pda = pda->next;
371 1.1 oster }
372 1.1 oster }
373 1.1 oster }
374 1.1 oster RF_ASSERT(nodeNum == nRodNodes);
375 1.1 oster
376 1.1 oster /* initialize the wnd nodes */
377 1.1 oster pda = asmap->physInfo;
378 1.1 oster for (i=0; i < nWndNodes; i++) {
379 1.1 oster rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
380 1.1 oster rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
381 1.1 oster RF_ASSERT(pda != NULL);
382 1.1 oster wndNodes[i].params[0].p = pda;
383 1.1 oster wndNodes[i].params[1].p = pda->bufPtr;
384 1.1 oster wndNodes[i].params[2].v = parityStripeID;
385 1.1 oster wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
386 1.1 oster pda = pda->next;
387 1.1 oster }
388 1.1 oster
389 1.1 oster /* initialize the redundancy node */
390 1.1 oster if (nRodNodes > 0) {
391 1.1 oster rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
392 1.1 oster nRodNodes, 2 * (nWndNodes+nRodNodes) + 1, nfaults, dag_h,
393 1.1 oster "Xr ", allocList);
394 1.1 oster }
395 1.1 oster else {
396 1.1 oster rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
397 1.1 oster 1, 2 * (nWndNodes+nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
398 1.1 oster }
399 1.1 oster xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
400 1.1 oster for (i=0; i < nWndNodes; i++) {
401 1.1 oster xorNode->params[2*i+0] = wndNodes[i].params[0]; /* pda */
402 1.1 oster xorNode->params[2*i+1] = wndNodes[i].params[1]; /* buf ptr */
403 1.1 oster }
404 1.1 oster for (i=0; i < nRodNodes; i++) {
405 1.1 oster xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0]; /* pda */
406 1.1 oster xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1]; /* buf ptr */
407 1.1 oster }
408 1.1 oster /* xor node needs to get at RAID information */
409 1.1 oster xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr;
410 1.1 oster
411 1.1 oster /*
412 1.1 oster * Look for an Rod node that reads a complete SU. If none, alloc a buffer
413 1.1 oster * to receive the parity info. Note that we can't use a new data buffer
414 1.1 oster * because it will not have gotten written when the xor occurs.
415 1.1 oster */
416 1.1 oster if (allowBufferRecycle) {
417 1.1 oster for (i = 0; i < nRodNodes; i++) {
418 1.1 oster if (((RF_PhysDiskAddr_t *)rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
419 1.1 oster break;
420 1.1 oster }
421 1.1 oster }
422 1.1 oster if ((!allowBufferRecycle) || (i == nRodNodes)) {
423 1.1 oster RF_CallocAndAdd(xorNode->results[0], 1,
424 1.1 oster rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
425 1.1 oster (void *), allocList);
426 1.1 oster }
427 1.1 oster else {
428 1.1 oster xorNode->results[0] = rodNodes[i].params[1].p;
429 1.1 oster }
430 1.1 oster
431 1.1 oster /* initialize the Wnp node */
432 1.1 oster rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
433 1.1 oster rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
434 1.1 oster wnpNode->params[0].p = asmap->parityInfo;
435 1.1 oster wnpNode->params[1].p = xorNode->results[0];
436 1.1 oster wnpNode->params[2].v = parityStripeID;
437 1.1 oster wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
438 1.1 oster /* parityInfo must describe entire parity unit */
439 1.1 oster RF_ASSERT(asmap->parityInfo->next == NULL);
440 1.1 oster
441 1.1 oster if (nfaults == 2) {
442 1.1 oster /*
443 1.1 oster * We never try to recycle a buffer for the Q calcuation
444 1.1 oster * in addition to the parity. This would cause two buffers
445 1.1 oster * to get smashed during the P and Q calculation, guaranteeing
446 1.1 oster * one would be wrong.
447 1.1 oster */
448 1.1 oster RF_CallocAndAdd(xorNode->results[1], 1,
449 1.1 oster rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
450 1.1 oster (void *),allocList);
451 1.1 oster rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
452 1.1 oster rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
453 1.1 oster wnqNode->params[0].p = asmap->qInfo;
454 1.1 oster wnqNode->params[1].p = xorNode->results[1];
455 1.1 oster wnqNode->params[2].v = parityStripeID;
456 1.1 oster wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
457 1.1 oster /* parityInfo must describe entire parity unit */
458 1.1 oster RF_ASSERT(asmap->parityInfo->next == NULL);
459 1.1 oster }
460 1.1 oster
461 1.1 oster /*
462 1.1 oster * Connect nodes to form graph.
463 1.1 oster */
464 1.1 oster
465 1.1 oster /* connect dag header to block node */
466 1.1 oster RF_ASSERT(blockNode->numAntecedents == 0);
467 1.1 oster dag_h->succedents[0] = blockNode;
468 1.1 oster
469 1.1 oster if (nRodNodes > 0) {
470 1.1 oster /* connect the block node to the Rod nodes */
471 1.1 oster RF_ASSERT(blockNode->numSuccedents == nRodNodes);
472 1.1 oster RF_ASSERT(xorNode->numAntecedents == nRodNodes);
473 1.1 oster for (i = 0; i < nRodNodes; i++) {
474 1.1 oster RF_ASSERT(rodNodes[i].numAntecedents == 1);
475 1.1 oster blockNode->succedents[i] = &rodNodes[i];
476 1.1 oster rodNodes[i].antecedents[0] = blockNode;
477 1.1 oster rodNodes[i].antType[0] = rf_control;
478 1.1 oster
479 1.1 oster /* connect the Rod nodes to the Xor node */
480 1.1 oster RF_ASSERT(rodNodes[i].numSuccedents == 1);
481 1.1 oster rodNodes[i].succedents[0] = xorNode;
482 1.1 oster xorNode->antecedents[i] = &rodNodes[i];
483 1.1 oster xorNode->antType[i] = rf_trueData;
484 1.1 oster }
485 1.1 oster }
486 1.1 oster else {
487 1.1 oster /* connect the block node to the Xor node */
488 1.1 oster RF_ASSERT(blockNode->numSuccedents == 1);
489 1.1 oster RF_ASSERT(xorNode->numAntecedents == 1);
490 1.1 oster blockNode->succedents[0] = xorNode;
491 1.1 oster xorNode->antecedents[0] = blockNode;
492 1.1 oster xorNode->antType[0] = rf_control;
493 1.1 oster }
494 1.1 oster
495 1.1 oster /* connect the xor node to the commit node */
496 1.1 oster RF_ASSERT(xorNode->numSuccedents == 1);
497 1.1 oster RF_ASSERT(commitNode->numAntecedents == 1);
498 1.1 oster xorNode->succedents[0] = commitNode;
499 1.1 oster commitNode->antecedents[0] = xorNode;
500 1.1 oster commitNode->antType[0] = rf_control;
501 1.1 oster
502 1.1 oster /* connect the commit node to the write nodes */
503 1.1 oster RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
504 1.1 oster for (i = 0; i < nWndNodes; i++) {
505 1.1 oster RF_ASSERT(wndNodes->numAntecedents == 1);
506 1.1 oster commitNode->succedents[i] = &wndNodes[i];
507 1.1 oster wndNodes[i].antecedents[0] = commitNode;
508 1.1 oster wndNodes[i].antType[0] = rf_control;
509 1.1 oster }
510 1.1 oster RF_ASSERT(wnpNode->numAntecedents == 1);
511 1.1 oster commitNode->succedents[nWndNodes] = wnpNode;
512 1.1 oster wnpNode->antecedents[0]= commitNode;
513 1.1 oster wnpNode->antType[0] = rf_trueData;
514 1.1 oster if (nfaults == 2) {
515 1.1 oster RF_ASSERT(wnqNode->numAntecedents == 1);
516 1.1 oster commitNode->succedents[nWndNodes + 1] = wnqNode;
517 1.1 oster wnqNode->antecedents[0] = commitNode;
518 1.1 oster wnqNode->antType[0] = rf_trueData;
519 1.1 oster }
520 1.1 oster
521 1.1 oster /* connect the write nodes to the term node */
522 1.1 oster RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
523 1.1 oster RF_ASSERT(termNode->numSuccedents == 0);
524 1.1 oster for (i = 0; i < nWndNodes; i++) {
525 1.1 oster RF_ASSERT(wndNodes->numSuccedents == 1);
526 1.1 oster wndNodes[i].succedents[0] = termNode;
527 1.1 oster termNode->antecedents[i] = &wndNodes[i];
528 1.1 oster termNode->antType[i] = rf_control;
529 1.1 oster }
530 1.1 oster RF_ASSERT(wnpNode->numSuccedents == 1);
531 1.1 oster wnpNode->succedents[0] = termNode;
532 1.1 oster termNode->antecedents[nWndNodes] = wnpNode;
533 1.1 oster termNode->antType[nWndNodes] = rf_control;
534 1.1 oster if (nfaults == 2) {
535 1.1 oster RF_ASSERT(wnqNode->numSuccedents == 1);
536 1.1 oster wnqNode->succedents[0] = termNode;
537 1.1 oster termNode->antecedents[nWndNodes + 1] = wnqNode;
538 1.1 oster termNode->antType[nWndNodes + 1] = rf_control;
539 1.1 oster }
540 1.1 oster }
541 1.1 oster
542 1.1 oster /******************************************************************************
543 1.1 oster *
544 1.1 oster * creates a DAG to perform a small-write operation (either raid 5 or pq),
545 1.1 oster * which is as follows:
546 1.1 oster *
547 1.1 oster * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
548 1.1 oster * \- Rod X / \----> Wnd [Und]-/
549 1.1 oster * [\- Rod X / \---> Wnd [Und]-/]
550 1.1 oster * [\- Roq -> Q / \--> Wnq [Unq]-/]
551 1.1 oster *
552 1.1 oster * Rop = read old parity
553 1.1 oster * Rod = read old data
554 1.1 oster * Roq = read old "q"
555 1.1 oster * Cmt = commit node
556 1.1 oster * Und = unlock data disk
557 1.1 oster * Unp = unlock parity disk
558 1.1 oster * Unq = unlock q disk
559 1.1 oster * Wnp = write new parity
560 1.1 oster * Wnd = write new data
561 1.1 oster * Wnq = write new "q"
562 1.1 oster * [ ] denotes optional segments in the graph
563 1.1 oster *
564 1.1 oster * Parameters: raidPtr - description of the physical array
565 1.1 oster * asmap - logical & physical addresses for this access
566 1.1 oster * bp - buffer ptr (holds write data)
567 1.1 oster * flags - general flags (e.g. disk locking)
568 1.1 oster * allocList - list of memory allocated in DAG creation
569 1.1 oster * pfuncs - list of parity generating functions
570 1.1 oster * qfuncs - list of q generating functions
571 1.1 oster *
572 1.1 oster * A null qfuncs indicates single fault tolerant
573 1.1 oster *****************************************************************************/
574 1.1 oster
575 1.1 oster void rf_CommonCreateSmallWriteDAG(
576 1.1 oster RF_Raid_t *raidPtr,
577 1.1 oster RF_AccessStripeMap_t *asmap,
578 1.1 oster RF_DagHeader_t *dag_h,
579 1.1 oster void *bp,
580 1.1 oster RF_RaidAccessFlags_t flags,
581 1.1 oster RF_AllocListElem_t *allocList,
582 1.1 oster RF_RedFuncs_t *pfuncs,
583 1.1 oster RF_RedFuncs_t *qfuncs)
584 1.1 oster {
585 1.1 oster RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
586 1.1 oster RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
587 1.1 oster RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
588 1.1 oster RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
589 1.1 oster int i, j, nNodes, totalNumNodes, lu_flag;
590 1.1 oster RF_ReconUnitNum_t which_ru;
591 1.1 oster int (*func)(RF_DagNode_t *), (*undoFunc)(RF_DagNode_t *);
592 1.1 oster int (*qfunc)(RF_DagNode_t *);
593 1.1 oster int numDataNodes, numParityNodes;
594 1.1 oster RF_StripeNum_t parityStripeID;
595 1.1 oster RF_PhysDiskAddr_t *pda;
596 1.1 oster char *name, *qname;
597 1.1 oster long nfaults;
598 1.1 oster
599 1.1 oster nfaults = qfuncs ? 2 : 1;
600 1.1 oster lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
601 1.1 oster
602 1.1 oster parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
603 1.1 oster asmap->raidAddress, &which_ru);
604 1.1 oster pda = asmap->physInfo;
605 1.1 oster numDataNodes = asmap->numStripeUnitsAccessed;
606 1.1 oster numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
607 1.1 oster
608 1.1 oster if (rf_dagDebug) {
609 1.1 oster printf("[Creating small-write DAG]\n");
610 1.1 oster }
611 1.1 oster RF_ASSERT(numDataNodes > 0);
612 1.1 oster dag_h->creator = "SmallWriteDAG";
613 1.1 oster
614 1.1 oster dag_h->numCommitNodes = 1;
615 1.1 oster dag_h->numCommits = 0;
616 1.1 oster dag_h->numSuccedents = 1;
617 1.1 oster
618 1.1 oster /*
619 1.1 oster * DAG creation occurs in four steps:
620 1.1 oster * 1. count the number of nodes in the DAG
621 1.1 oster * 2. create the nodes
622 1.1 oster * 3. initialize the nodes
623 1.1 oster * 4. connect the nodes
624 1.1 oster */
625 1.1 oster
626 1.1 oster /*
627 1.1 oster * Step 1. compute number of nodes in the graph
628 1.1 oster */
629 1.1 oster
630 1.1 oster /* number of nodes:
631 1.1 oster * a read and write for each data unit
632 1.1 oster * a redundancy computation node for each parity node (nfaults * nparity)
633 1.1 oster * a read and write for each parity unit
634 1.1 oster * a block and commit node (2)
635 1.1 oster * a terminate node
636 1.1 oster * if atomic RMW
637 1.1 oster * an unlock node for each data unit, redundancy unit
638 1.1 oster */
639 1.1 oster totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
640 1.1 oster + (nfaults * 2 * numParityNodes) + 3;
641 1.1 oster if (lu_flag) {
642 1.1 oster totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
643 1.1 oster }
644 1.1 oster
645 1.1 oster /*
646 1.1 oster * Step 2. create the nodes
647 1.1 oster */
648 1.1 oster RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
649 1.1 oster (RF_DagNode_t *), allocList);
650 1.1 oster i = 0;
651 1.1 oster blockNode = &nodes[i]; i += 1;
652 1.1 oster commitNode = &nodes[i]; i += 1;
653 1.1 oster readDataNodes = &nodes[i]; i += numDataNodes;
654 1.1 oster readParityNodes = &nodes[i]; i += numParityNodes;
655 1.1 oster writeDataNodes = &nodes[i]; i += numDataNodes;
656 1.1 oster writeParityNodes = &nodes[i]; i += numParityNodes;
657 1.1 oster xorNodes = &nodes[i]; i += numParityNodes;
658 1.1 oster termNode = &nodes[i]; i += 1;
659 1.1 oster if (lu_flag) {
660 1.1 oster unlockDataNodes = &nodes[i]; i += numDataNodes;
661 1.1 oster unlockParityNodes = &nodes[i]; i += numParityNodes;
662 1.1 oster }
663 1.1 oster else {
664 1.1 oster unlockDataNodes = unlockParityNodes = NULL;
665 1.1 oster }
666 1.1 oster if (nfaults == 2) {
667 1.1 oster readQNodes = &nodes[i]; i += numParityNodes;
668 1.1 oster writeQNodes = &nodes[i]; i += numParityNodes;
669 1.1 oster qNodes = &nodes[i]; i += numParityNodes;
670 1.1 oster if (lu_flag) {
671 1.1 oster unlockQNodes = &nodes[i]; i += numParityNodes;
672 1.1 oster }
673 1.1 oster else {
674 1.1 oster unlockQNodes = NULL;
675 1.1 oster }
676 1.1 oster }
677 1.1 oster else {
678 1.1 oster readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
679 1.1 oster }
680 1.1 oster RF_ASSERT(i == totalNumNodes);
681 1.1 oster
682 1.1 oster /*
683 1.1 oster * Step 3. initialize the nodes
684 1.1 oster */
685 1.1 oster /* initialize block node (Nil) */
686 1.1 oster nNodes = numDataNodes + (nfaults * numParityNodes);
687 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
688 1.1 oster NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
689 1.1 oster
690 1.1 oster /* initialize commit node (Cmt) */
691 1.1 oster rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
692 1.1 oster NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
693 1.1 oster
694 1.1 oster /* initialize terminate node (Trm) */
695 1.1 oster rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
696 1.1 oster NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
697 1.1 oster
698 1.1 oster /* initialize nodes which read old data (Rod) */
699 1.1 oster for (i = 0; i < numDataNodes; i++) {
700 1.1 oster rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
701 1.1 oster rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
702 1.1 oster "Rod", allocList);
703 1.1 oster RF_ASSERT(pda != NULL);
704 1.1 oster /* physical disk addr desc */
705 1.1 oster readDataNodes[i].params[0].p = pda;
706 1.1 oster /* buffer to hold old data */
707 1.1 oster readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
708 1.1 oster dag_h, pda, allocList);
709 1.1 oster readDataNodes[i].params[2].v = parityStripeID;
710 1.1 oster readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
711 1.1 oster lu_flag, 0, which_ru);
712 1.1 oster pda = pda->next;
713 1.1 oster for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
714 1.1 oster readDataNodes[i].propList[j] = NULL;
715 1.1 oster }
716 1.1 oster }
717 1.1 oster
718 1.1 oster /* initialize nodes which read old parity (Rop) */
719 1.1 oster pda = asmap->parityInfo; i = 0;
720 1.1 oster for (i = 0; i < numParityNodes; i++) {
721 1.1 oster RF_ASSERT(pda != NULL);
722 1.1 oster rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
723 1.1 oster rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
724 1.1 oster 0, dag_h, "Rop", allocList);
725 1.1 oster readParityNodes[i].params[0].p = pda;
726 1.1 oster /* buffer to hold old parity */
727 1.1 oster readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
728 1.1 oster dag_h, pda, allocList);
729 1.1 oster readParityNodes[i].params[2].v = parityStripeID;
730 1.1 oster readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
731 1.1 oster lu_flag, 0, which_ru);
732 1.1 oster pda = pda->next;
733 1.1 oster for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
734 1.1 oster readParityNodes[i].propList[0] = NULL;
735 1.1 oster }
736 1.1 oster }
737 1.1 oster
738 1.1 oster /* initialize nodes which read old Q (Roq) */
739 1.1 oster if (nfaults == 2) {
740 1.1 oster pda = asmap->qInfo;
741 1.1 oster for (i = 0; i < numParityNodes; i++) {
742 1.1 oster RF_ASSERT(pda != NULL);
743 1.1 oster rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
744 1.1 oster rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
745 1.1 oster readQNodes[i].params[0].p = pda;
746 1.1 oster /* buffer to hold old Q */
747 1.1 oster readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
748 1.1 oster allocList);
749 1.1 oster readQNodes[i].params[2].v = parityStripeID;
750 1.1 oster readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
751 1.1 oster lu_flag, 0, which_ru);
752 1.1 oster pda = pda->next;
753 1.1 oster for (j = 0; j < readQNodes[i].numSuccedents; j++) {
754 1.1 oster readQNodes[i].propList[0] = NULL;
755 1.1 oster }
756 1.1 oster }
757 1.1 oster }
758 1.1 oster
759 1.1 oster /* initialize nodes which write new data (Wnd) */
760 1.1 oster pda = asmap->physInfo;
761 1.1 oster for (i=0; i < numDataNodes; i++) {
762 1.1 oster RF_ASSERT(pda != NULL);
763 1.1 oster rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
764 1.1 oster rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
765 1.1 oster "Wnd", allocList);
766 1.1 oster /* physical disk addr desc */
767 1.1 oster writeDataNodes[i].params[0].p = pda;
768 1.1 oster /* buffer holding new data to be written */
769 1.1 oster writeDataNodes[i].params[1].p = pda->bufPtr;
770 1.1 oster writeDataNodes[i].params[2].v = parityStripeID;
771 1.1 oster writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
772 1.1 oster 0, 0, which_ru);
773 1.1 oster if (lu_flag) {
774 1.1 oster /* initialize node to unlock the disk queue */
775 1.1 oster rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
776 1.1 oster rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
777 1.1 oster "Und", allocList);
778 1.1 oster /* physical disk addr desc */
779 1.1 oster unlockDataNodes[i].params[0].p = pda;
780 1.1 oster unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
781 1.1 oster 0, lu_flag, which_ru);
782 1.1 oster }
783 1.1 oster pda = pda->next;
784 1.1 oster }
785 1.1 oster
786 1.1 oster /*
787 1.1 oster * Initialize nodes which compute new parity and Q.
788 1.1 oster */
789 1.1 oster /*
790 1.1 oster * We use the simple XOR func in the double-XOR case, and when
791 1.1 oster * we're accessing only a portion of one stripe unit. The distinction
792 1.1 oster * between the two is that the regular XOR func assumes that the targbuf
793 1.1 oster * is a full SU in size, and examines the pda associated with the buffer
794 1.1 oster * to decide where within the buffer to XOR the data, whereas
795 1.1 oster * the simple XOR func just XORs the data into the start of the buffer.
796 1.1 oster */
797 1.1 oster if ((numParityNodes==2) || ((numDataNodes == 1)
798 1.1 oster && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit)))
799 1.1 oster {
800 1.1 oster func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
801 1.1 oster if (qfuncs) {
802 1.1 oster qfunc = qfuncs->simple;
803 1.1 oster qname = qfuncs->SimpleName;
804 1.1 oster }
805 1.1 oster else {
806 1.1 oster qfunc = NULL;
807 1.1 oster qname = NULL;
808 1.1 oster }
809 1.1 oster }
810 1.1 oster else {
811 1.1 oster func = pfuncs->regular;
812 1.1 oster undoFunc = rf_NullNodeUndoFunc;
813 1.1 oster name = pfuncs->RegularName;
814 1.1 oster if (qfuncs) {
815 1.1 oster qfunc = qfuncs->regular;
816 1.1 oster qname = qfuncs->RegularName;
817 1.1 oster }
818 1.1 oster else {
819 1.1 oster qfunc = NULL;
820 1.1 oster qname = NULL;
821 1.1 oster }
822 1.1 oster }
823 1.1 oster /*
824 1.1 oster * Initialize the xor nodes: params are {pda,buf}
825 1.1 oster * from {Rod,Wnd,Rop} nodes, and raidPtr
826 1.1 oster */
827 1.1 oster if (numParityNodes==2) {
828 1.1 oster /* double-xor case */
829 1.1 oster for (i=0; i < numParityNodes; i++) {
830 1.1 oster /* note: no wakeup func for xor */
831 1.1 oster rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
832 1.1 oster 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
833 1.1 oster xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
834 1.1 oster xorNodes[i].params[0] = readDataNodes[i].params[0];
835 1.1 oster xorNodes[i].params[1] = readDataNodes[i].params[1];
836 1.1 oster xorNodes[i].params[2] = readParityNodes[i].params[0];
837 1.1 oster xorNodes[i].params[3] = readParityNodes[i].params[1];
838 1.1 oster xorNodes[i].params[4] = writeDataNodes[i].params[0];
839 1.1 oster xorNodes[i].params[5] = writeDataNodes[i].params[1];
840 1.1 oster xorNodes[i].params[6].p = raidPtr;
841 1.1 oster /* use old parity buf as target buf */
842 1.1 oster xorNodes[i].results[0] = readParityNodes[i].params[1].p;
843 1.1 oster if (nfaults == 2) {
844 1.1 oster /* note: no wakeup func for qor */
845 1.1 oster rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
846 1.1 oster (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
847 1.1 oster qNodes[i].params[0] = readDataNodes[i].params[0];
848 1.1 oster qNodes[i].params[1] = readDataNodes[i].params[1];
849 1.1 oster qNodes[i].params[2] = readQNodes[i].params[0];
850 1.1 oster qNodes[i].params[3] = readQNodes[i].params[1];
851 1.1 oster qNodes[i].params[4] = writeDataNodes[i].params[0];
852 1.1 oster qNodes[i].params[5] = writeDataNodes[i].params[1];
853 1.1 oster qNodes[i].params[6].p = raidPtr;
854 1.1 oster /* use old Q buf as target buf */
855 1.1 oster qNodes[i].results[0] = readQNodes[i].params[1].p;
856 1.1 oster }
857 1.1 oster }
858 1.1 oster }
859 1.1 oster else {
860 1.1 oster /* there is only one xor node in this case */
861 1.1 oster rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
862 1.1 oster (numDataNodes + numParityNodes),
863 1.1 oster (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
864 1.1 oster xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
865 1.1 oster for (i=0; i < numDataNodes + 1; i++) {
866 1.1 oster /* set up params related to Rod and Rop nodes */
867 1.1 oster xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
868 1.1 oster xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer ptr */
869 1.1 oster }
870 1.1 oster for (i=0; i < numDataNodes; i++) {
871 1.1 oster /* set up params related to Wnd and Wnp nodes */
872 1.1 oster xorNodes[0].params[2*(numDataNodes+1+i)+0] = /* pda */
873 1.1 oster writeDataNodes[i].params[0];
874 1.1 oster xorNodes[0].params[2*(numDataNodes+1+i)+1] = /* buffer ptr */
875 1.1 oster writeDataNodes[i].params[1];
876 1.1 oster }
877 1.1 oster /* xor node needs to get at RAID information */
878 1.1 oster xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;
879 1.1 oster xorNodes[0].results[0] = readParityNodes[0].params[1].p;
880 1.1 oster if (nfaults == 2) {
881 1.1 oster rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
882 1.1 oster (numDataNodes + numParityNodes),
883 1.1 oster (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
884 1.1 oster qname, allocList);
885 1.1 oster for (i=0; i<numDataNodes; i++) {
886 1.1 oster /* set up params related to Rod */
887 1.1 oster qNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
888 1.1 oster qNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer ptr */
889 1.1 oster }
890 1.1 oster /* and read old q */
891 1.1 oster qNodes[0].params[2*numDataNodes + 0] = /* pda */
892 1.1 oster readQNodes[0].params[0];
893 1.1 oster qNodes[0].params[2*numDataNodes + 1] = /* buffer ptr */
894 1.1 oster readQNodes[0].params[1];
895 1.1 oster for (i=0; i < numDataNodes; i++) {
896 1.1 oster /* set up params related to Wnd nodes */
897 1.1 oster qNodes[0].params[2*(numDataNodes+1+i)+0] = /* pda */
898 1.1 oster writeDataNodes[i].params[0];
899 1.1 oster qNodes[0].params[2*(numDataNodes+1+i)+1] = /* buffer ptr */
900 1.1 oster writeDataNodes[i].params[1];
901 1.1 oster }
902 1.1 oster /* xor node needs to get at RAID information */
903 1.1 oster qNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;
904 1.1 oster qNodes[0].results[0] = readQNodes[0].params[1].p;
905 1.1 oster }
906 1.1 oster }
907 1.1 oster
908 1.1 oster /* initialize nodes which write new parity (Wnp) */
909 1.1 oster pda = asmap->parityInfo;
910 1.1 oster for (i=0; i < numParityNodes; i++) {
911 1.1 oster rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
912 1.1 oster rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
913 1.1 oster "Wnp", allocList);
914 1.1 oster RF_ASSERT(pda != NULL);
915 1.1 oster writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
916 1.1 oster writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for parity write operation */
917 1.1 oster writeParityNodes[i].params[2].v = parityStripeID;
918 1.1 oster writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
919 1.1 oster 0, 0, which_ru);
920 1.1 oster if (lu_flag) {
921 1.1 oster /* initialize node to unlock the disk queue */
922 1.1 oster rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
923 1.1 oster rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
924 1.1 oster "Unp", allocList);
925 1.1 oster unlockParityNodes[i].params[0].p = pda; /* physical disk addr desc */
926 1.1 oster unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
927 1.1 oster 0, lu_flag, which_ru);
928 1.1 oster }
929 1.1 oster pda = pda->next;
930 1.1 oster }
931 1.1 oster
932 1.1 oster /* initialize nodes which write new Q (Wnq) */
933 1.1 oster if (nfaults == 2) {
934 1.1 oster pda = asmap->qInfo;
935 1.1 oster for (i=0; i < numParityNodes; i++) {
936 1.1 oster rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
937 1.1 oster rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
938 1.1 oster "Wnq", allocList);
939 1.1 oster RF_ASSERT(pda != NULL);
940 1.1 oster writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
941 1.1 oster writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for parity write operation */
942 1.1 oster writeQNodes[i].params[2].v = parityStripeID;
943 1.1 oster writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
944 1.1 oster 0, 0, which_ru);
945 1.1 oster if (lu_flag) {
946 1.1 oster /* initialize node to unlock the disk queue */
947 1.1 oster rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
948 1.1 oster rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
949 1.1 oster "Unq", allocList);
950 1.1 oster unlockQNodes[i].params[0].p = pda; /* physical disk addr desc */
951 1.1 oster unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
952 1.1 oster 0, lu_flag, which_ru);
953 1.1 oster }
954 1.1 oster pda = pda->next;
955 1.1 oster }
956 1.1 oster }
957 1.1 oster
958 1.1 oster /*
959 1.1 oster * Step 4. connect the nodes.
960 1.1 oster */
961 1.1 oster
962 1.1 oster /* connect header to block node */
963 1.1 oster dag_h->succedents[0] = blockNode;
964 1.1 oster
965 1.1 oster /* connect block node to read old data nodes */
966 1.1 oster RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
967 1.1 oster for (i = 0; i < numDataNodes; i++) {
968 1.1 oster blockNode->succedents[i] = &readDataNodes[i];
969 1.1 oster RF_ASSERT(readDataNodes[i].numAntecedents == 1);
970 1.1 oster readDataNodes[i].antecedents[0]= blockNode;
971 1.1 oster readDataNodes[i].antType[0] = rf_control;
972 1.1 oster }
973 1.1 oster
974 1.1 oster /* connect block node to read old parity nodes */
975 1.1 oster for (i = 0; i < numParityNodes; i++) {
976 1.1 oster blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
977 1.1 oster RF_ASSERT(readParityNodes[i].numAntecedents == 1);
978 1.1 oster readParityNodes[i].antecedents[0] = blockNode;
979 1.1 oster readParityNodes[i].antType[0] = rf_control;
980 1.1 oster }
981 1.1 oster
982 1.1 oster /* connect block node to read old Q nodes */
983 1.1 oster if (nfaults == 2) {
984 1.1 oster for (i = 0; i < numParityNodes; i++) {
985 1.1 oster blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
986 1.1 oster RF_ASSERT(readQNodes[i].numAntecedents == 1);
987 1.1 oster readQNodes[i].antecedents[0] = blockNode;
988 1.1 oster readQNodes[i].antType[0] = rf_control;
989 1.1 oster }
990 1.1 oster }
991 1.1 oster
992 1.1 oster /* connect read old data nodes to xor nodes */
993 1.1 oster for (i = 0; i < numDataNodes; i++) {
994 1.1 oster RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
995 1.1 oster for (j = 0; j < numParityNodes; j++){
996 1.1 oster RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
997 1.1 oster readDataNodes[i].succedents[j] = &xorNodes[j];
998 1.1 oster xorNodes[j].antecedents[i] = &readDataNodes[i];
999 1.1 oster xorNodes[j].antType[i] = rf_trueData;
1000 1.1 oster }
1001 1.1 oster }
1002 1.1 oster
1003 1.1 oster /* connect read old data nodes to q nodes */
1004 1.1 oster if (nfaults == 2) {
1005 1.1 oster for (i = 0; i < numDataNodes; i++) {
1006 1.1 oster for (j = 0; j < numParityNodes; j++) {
1007 1.1 oster RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
1008 1.1 oster readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
1009 1.1 oster qNodes[j].antecedents[i] = &readDataNodes[i];
1010 1.1 oster qNodes[j].antType[i] = rf_trueData;
1011 1.1 oster }
1012 1.1 oster }
1013 1.1 oster }
1014 1.1 oster
1015 1.1 oster /* connect read old parity nodes to xor nodes */
1016 1.1 oster for (i = 0; i < numParityNodes; i++) {
1017 1.1 oster RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1018 1.1 oster for (j = 0; j < numParityNodes; j++) {
1019 1.1 oster readParityNodes[i].succedents[j] = &xorNodes[j];
1020 1.1 oster xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
1021 1.1 oster xorNodes[j].antType[numDataNodes + i] = rf_trueData;
1022 1.1 oster }
1023 1.1 oster }
1024 1.1 oster
1025 1.1 oster /* connect read old q nodes to q nodes */
1026 1.1 oster if (nfaults == 2) {
1027 1.1 oster for (i = 0; i < numParityNodes; i++) {
1028 1.1 oster RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1029 1.1 oster for (j = 0; j < numParityNodes; j++) {
1030 1.1 oster readQNodes[i].succedents[j] = &qNodes[j];
1031 1.1 oster qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
1032 1.1 oster qNodes[j].antType[numDataNodes + i] = rf_trueData;
1033 1.1 oster }
1034 1.1 oster }
1035 1.1 oster }
1036 1.1 oster
1037 1.1 oster /* connect xor nodes to commit node */
1038 1.1 oster RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
1039 1.1 oster for (i = 0; i < numParityNodes; i++) {
1040 1.1 oster RF_ASSERT(xorNodes[i].numSuccedents == 1);
1041 1.1 oster xorNodes[i].succedents[0] = commitNode;
1042 1.1 oster commitNode->antecedents[i] = &xorNodes[i];
1043 1.1 oster commitNode->antType[i] = rf_control;
1044 1.1 oster }
1045 1.1 oster
1046 1.1 oster /* connect q nodes to commit node */
1047 1.1 oster if (nfaults == 2) {
1048 1.1 oster for (i = 0; i < numParityNodes; i++) {
1049 1.1 oster RF_ASSERT(qNodes[i].numSuccedents == 1);
1050 1.1 oster qNodes[i].succedents[0] = commitNode;
1051 1.1 oster commitNode->antecedents[i + numParityNodes] = &qNodes[i];
1052 1.1 oster commitNode->antType[i + numParityNodes] = rf_control;
1053 1.1 oster }
1054 1.1 oster }
1055 1.1 oster
1056 1.1 oster /* connect commit node to write nodes */
1057 1.1 oster RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
1058 1.1 oster for (i = 0; i < numDataNodes; i++) {
1059 1.1 oster RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1060 1.1 oster commitNode->succedents[i] = &writeDataNodes[i];
1061 1.1 oster writeDataNodes[i].antecedents[0] = commitNode;
1062 1.1 oster writeDataNodes[i].antType[0] = rf_trueData;
1063 1.1 oster }
1064 1.1 oster for (i = 0; i < numParityNodes; i++) {
1065 1.1 oster RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
1066 1.1 oster commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
1067 1.1 oster writeParityNodes[i].antecedents[0] = commitNode;
1068 1.1 oster writeParityNodes[i].antType[0] = rf_trueData;
1069 1.1 oster }
1070 1.1 oster if (nfaults == 2) {
1071 1.1 oster for (i = 0; i < numParityNodes; i++) {
1072 1.1 oster RF_ASSERT(writeQNodes[i].numAntecedents == 1);
1073 1.1 oster commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
1074 1.1 oster writeQNodes[i].antecedents[0] = commitNode;
1075 1.1 oster writeQNodes[i].antType[0] = rf_trueData;
1076 1.1 oster }
1077 1.1 oster }
1078 1.1 oster
1079 1.1 oster RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1080 1.1 oster RF_ASSERT(termNode->numSuccedents == 0);
1081 1.1 oster for (i = 0; i < numDataNodes; i++) {
1082 1.1 oster if (lu_flag) {
1083 1.1 oster /* connect write new data nodes to unlock nodes */
1084 1.1 oster RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1085 1.1 oster RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1086 1.1 oster writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1087 1.1 oster unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1088 1.1 oster unlockDataNodes[i].antType[0] = rf_control;
1089 1.1 oster
1090 1.1 oster /* connect unlock nodes to term node */
1091 1.1 oster RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1092 1.1 oster unlockDataNodes[i].succedents[0] = termNode;
1093 1.1 oster termNode->antecedents[i] = &unlockDataNodes[i];
1094 1.1 oster termNode->antType[i] = rf_control;
1095 1.1 oster }
1096 1.1 oster else {
1097 1.1 oster /* connect write new data nodes to term node */
1098 1.1 oster RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1099 1.1 oster RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1100 1.1 oster writeDataNodes[i].succedents[0] = termNode;
1101 1.1 oster termNode->antecedents[i] = &writeDataNodes[i];
1102 1.1 oster termNode->antType[i] = rf_control;
1103 1.1 oster }
1104 1.1 oster }
1105 1.1 oster
1106 1.1 oster for (i = 0; i < numParityNodes; i++) {
1107 1.1 oster if (lu_flag) {
1108 1.1 oster /* connect write new parity nodes to unlock nodes */
1109 1.1 oster RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1110 1.1 oster RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1111 1.1 oster writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1112 1.1 oster unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1113 1.1 oster unlockParityNodes[i].antType[0] = rf_control;
1114 1.1 oster
1115 1.1 oster /* connect unlock nodes to term node */
1116 1.1 oster RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1117 1.1 oster unlockParityNodes[i].succedents[0] = termNode;
1118 1.1 oster termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1119 1.1 oster termNode->antType[numDataNodes + i] = rf_control;
1120 1.1 oster }
1121 1.1 oster else {
1122 1.1 oster RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1123 1.1 oster writeParityNodes[i].succedents[0] = termNode;
1124 1.1 oster termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1125 1.1 oster termNode->antType[numDataNodes + i] = rf_control;
1126 1.1 oster }
1127 1.1 oster }
1128 1.1 oster
1129 1.1 oster if (nfaults == 2) {
1130 1.1 oster for (i = 0; i < numParityNodes; i++) {
1131 1.1 oster if (lu_flag) {
1132 1.1 oster /* connect write new Q nodes to unlock nodes */
1133 1.1 oster RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1134 1.1 oster RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1135 1.1 oster writeQNodes[i].succedents[0] = &unlockQNodes[i];
1136 1.1 oster unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1137 1.1 oster unlockQNodes[i].antType[0] = rf_control;
1138 1.1 oster
1139 1.1 oster /* connect unlock nodes to unblock node */
1140 1.1 oster RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1141 1.1 oster unlockQNodes[i].succedents[0] = termNode;
1142 1.1 oster termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1143 1.1 oster termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1144 1.1 oster }
1145 1.1 oster else {
1146 1.1 oster RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1147 1.1 oster writeQNodes[i].succedents[0] = termNode;
1148 1.1 oster termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1149 1.1 oster termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1150 1.1 oster }
1151 1.1 oster }
1152 1.1 oster }
1153 1.1 oster }
1154 1.1 oster
1155 1.1 oster
1156 1.1 oster /******************************************************************************
1157 1.1 oster * create a write graph (fault-free or degraded) for RAID level 1
1158 1.1 oster *
1159 1.1 oster * Hdr -> Commit -> Wpd -> Nil -> Trm
1160 1.1 oster * -> Wsd ->
1161 1.1 oster *
1162 1.1 oster * The "Wpd" node writes data to the primary copy in the mirror pair
1163 1.1 oster * The "Wsd" node writes data to the secondary copy in the mirror pair
1164 1.1 oster *
1165 1.1 oster * Parameters: raidPtr - description of the physical array
1166 1.1 oster * asmap - logical & physical addresses for this access
1167 1.1 oster * bp - buffer ptr (holds write data)
1168 1.1 oster * flags - general flags (e.g. disk locking)
1169 1.1 oster * allocList - list of memory allocated in DAG creation
1170 1.1 oster *****************************************************************************/
1171 1.1 oster
1172 1.1 oster void rf_CreateRaidOneWriteDAG(
1173 1.1 oster RF_Raid_t *raidPtr,
1174 1.1 oster RF_AccessStripeMap_t *asmap,
1175 1.1 oster RF_DagHeader_t *dag_h,
1176 1.1 oster void *bp,
1177 1.1 oster RF_RaidAccessFlags_t flags,
1178 1.1 oster RF_AllocListElem_t *allocList)
1179 1.1 oster {
1180 1.1 oster RF_DagNode_t *unblockNode, *termNode, *commitNode;
1181 1.1 oster RF_DagNode_t *nodes, *wndNode, *wmirNode;
1182 1.1 oster int nWndNodes, nWmirNodes, i;
1183 1.1 oster RF_ReconUnitNum_t which_ru;
1184 1.1 oster RF_PhysDiskAddr_t *pda, *pdaP;
1185 1.1 oster RF_StripeNum_t parityStripeID;
1186 1.1 oster
1187 1.1 oster parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1188 1.1 oster asmap->raidAddress, &which_ru);
1189 1.1 oster if (rf_dagDebug) {
1190 1.1 oster printf("[Creating RAID level 1 write DAG]\n");
1191 1.1 oster }
1192 1.1 oster dag_h->creator = "RaidOneWriteDAG";
1193 1.1 oster
1194 1.1 oster /* 2 implies access not SU aligned */
1195 1.1 oster nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1196 1.1 oster nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1197 1.1 oster
1198 1.1 oster /* alloc the Wnd nodes and the Wmir node */
1199 1.1 oster if (asmap->numDataFailed == 1)
1200 1.1 oster nWndNodes--;
1201 1.1 oster if (asmap->numParityFailed == 1)
1202 1.1 oster nWmirNodes--;
1203 1.1 oster
1204 1.1 oster /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock + terminator) */
1205 1.1 oster RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1206 1.1 oster (RF_DagNode_t *), allocList);
1207 1.1 oster i = 0;
1208 1.1 oster wndNode = &nodes[i]; i += nWndNodes;
1209 1.1 oster wmirNode = &nodes[i]; i += nWmirNodes;
1210 1.1 oster commitNode = &nodes[i]; i += 1;
1211 1.1 oster unblockNode = &nodes[i]; i += 1;
1212 1.1 oster termNode = &nodes[i]; i += 1;
1213 1.1 oster RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1214 1.1 oster
1215 1.1 oster /* this dag can commit immediately */
1216 1.1 oster dag_h->numCommitNodes = 1;
1217 1.1 oster dag_h->numCommits = 0;
1218 1.1 oster dag_h->numSuccedents = 1;
1219 1.1 oster
1220 1.1 oster /* initialize the commit, unblock, and term nodes */
1221 1.1 oster rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1222 1.1 oster NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
1223 1.1 oster rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1224 1.1 oster NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
1225 1.1 oster rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
1226 1.1 oster NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1227 1.1 oster
1228 1.1 oster /* initialize the wnd nodes */
1229 1.1 oster if (nWndNodes > 0) {
1230 1.1 oster pda = asmap->physInfo;
1231 1.1 oster for (i = 0; i < nWndNodes; i++) {
1232 1.1 oster rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1233 1.1 oster rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
1234 1.1 oster RF_ASSERT(pda != NULL);
1235 1.1 oster wndNode[i].params[0].p = pda;
1236 1.1 oster wndNode[i].params[1].p = pda->bufPtr;
1237 1.1 oster wndNode[i].params[2].v = parityStripeID;
1238 1.1 oster wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1239 1.1 oster pda = pda->next;
1240 1.1 oster }
1241 1.1 oster RF_ASSERT(pda == NULL);
1242 1.1 oster }
1243 1.1 oster
1244 1.1 oster /* initialize the mirror nodes */
1245 1.1 oster if (nWmirNodes > 0) {
1246 1.1 oster pda = asmap->physInfo;
1247 1.1 oster pdaP = asmap->parityInfo;
1248 1.1 oster for (i = 0; i < nWmirNodes; i++) {
1249 1.1 oster rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1250 1.1 oster rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
1251 1.1 oster RF_ASSERT(pda != NULL);
1252 1.1 oster wmirNode[i].params[0].p = pdaP;
1253 1.1 oster wmirNode[i].params[1].p = pda->bufPtr;
1254 1.1 oster wmirNode[i].params[2].v = parityStripeID;
1255 1.1 oster wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1256 1.1 oster pda = pda->next;
1257 1.1 oster pdaP = pdaP->next;
1258 1.1 oster }
1259 1.1 oster RF_ASSERT(pda == NULL);
1260 1.1 oster RF_ASSERT(pdaP == NULL);
1261 1.1 oster }
1262 1.1 oster
1263 1.1 oster /* link the header node to the commit node */
1264 1.1 oster RF_ASSERT(dag_h->numSuccedents == 1);
1265 1.1 oster RF_ASSERT(commitNode->numAntecedents == 0);
1266 1.1 oster dag_h->succedents[0] = commitNode;
1267 1.1 oster
1268 1.1 oster /* link the commit node to the write nodes */
1269 1.1 oster RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1270 1.1 oster for (i = 0; i < nWndNodes; i++) {
1271 1.1 oster RF_ASSERT(wndNode[i].numAntecedents == 1);
1272 1.1 oster commitNode->succedents[i] = &wndNode[i];
1273 1.1 oster wndNode[i].antecedents[0] = commitNode;
1274 1.1 oster wndNode[i].antType[0] = rf_control;
1275 1.1 oster }
1276 1.1 oster for (i = 0; i < nWmirNodes; i++) {
1277 1.1 oster RF_ASSERT(wmirNode[i].numAntecedents == 1);
1278 1.1 oster commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1279 1.1 oster wmirNode[i].antecedents[0] = commitNode;
1280 1.1 oster wmirNode[i].antType[0] = rf_control;
1281 1.1 oster }
1282 1.1 oster
1283 1.1 oster /* link the write nodes to the unblock node */
1284 1.1 oster RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1285 1.1 oster for (i = 0; i < nWndNodes; i++) {
1286 1.1 oster RF_ASSERT(wndNode[i].numSuccedents == 1);
1287 1.1 oster wndNode[i].succedents[0] = unblockNode;
1288 1.1 oster unblockNode->antecedents[i] = &wndNode[i];
1289 1.1 oster unblockNode->antType[i] = rf_control;
1290 1.1 oster }
1291 1.1 oster for (i = 0; i < nWmirNodes; i++) {
1292 1.1 oster RF_ASSERT(wmirNode[i].numSuccedents == 1);
1293 1.1 oster wmirNode[i].succedents[0] = unblockNode;
1294 1.1 oster unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1295 1.1 oster unblockNode->antType[i + nWndNodes] = rf_control;
1296 1.1 oster }
1297 1.1 oster
1298 1.1 oster /* link the unblock node to the term node */
1299 1.1 oster RF_ASSERT(unblockNode->numSuccedents == 1);
1300 1.1 oster RF_ASSERT(termNode->numAntecedents == 1);
1301 1.1 oster RF_ASSERT(termNode->numSuccedents == 0);
1302 1.1 oster unblockNode->succedents[0] = termNode;
1303 1.1 oster termNode->antecedents[0] = unblockNode;
1304 1.1 oster termNode->antType[0] = rf_control;
1305 1.1 oster }
1306 1.1 oster
1307 1.1 oster
1308 1.1 oster
1309 1.1 oster /* DAGs which have no commit points.
1310 1.1 oster *
1311 1.1 oster * The following DAGs are used in forward and backward error recovery experiments.
1312 1.1 oster * They are identical to the DAGs above this comment with the exception that the
1313 1.1 oster * the commit points have been removed.
1314 1.1 oster */
1315 1.1 oster
1316 1.1 oster
1317 1.1 oster
1318 1.1 oster void rf_CommonCreateLargeWriteDAGFwd(
1319 1.1 oster RF_Raid_t *raidPtr,
1320 1.1 oster RF_AccessStripeMap_t *asmap,
1321 1.1 oster RF_DagHeader_t *dag_h,
1322 1.1 oster void *bp,
1323 1.1 oster RF_RaidAccessFlags_t flags,
1324 1.1 oster RF_AllocListElem_t *allocList,
1325 1.1 oster int nfaults,
1326 1.1 oster int (*redFunc)(RF_DagNode_t *),
1327 1.1 oster int allowBufferRecycle)
1328 1.1 oster {
1329 1.1 oster RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1330 1.1 oster RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1331 1.1 oster int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1332 1.1 oster RF_AccessStripeMapHeader_t *new_asm_h[2];
1333 1.1 oster RF_StripeNum_t parityStripeID;
1334 1.1 oster char *sosBuffer, *eosBuffer;
1335 1.1 oster RF_ReconUnitNum_t which_ru;
1336 1.1 oster RF_RaidLayout_t *layoutPtr;
1337 1.1 oster RF_PhysDiskAddr_t *pda;
1338 1.1 oster
1339 1.1 oster layoutPtr = &(raidPtr->Layout);
1340 1.1 oster parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1341 1.1 oster
1342 1.1 oster if (rf_dagDebug)
1343 1.1 oster printf("[Creating large-write DAG]\n");
1344 1.1 oster dag_h->creator = "LargeWriteDAGFwd";
1345 1.1 oster
1346 1.1 oster dag_h->numCommitNodes = 0;
1347 1.1 oster dag_h->numCommits = 0;
1348 1.1 oster dag_h->numSuccedents = 1;
1349 1.1 oster
1350 1.1 oster /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
1351 1.1 oster nWndNodes = asmap->numStripeUnitsAccessed;
1352 1.1 oster RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1353 1.1 oster i = 0;
1354 1.1 oster wndNodes = &nodes[i]; i += nWndNodes;
1355 1.1 oster xorNode = &nodes[i]; i += 1;
1356 1.1 oster wnpNode = &nodes[i]; i += 1;
1357 1.1 oster blockNode = &nodes[i]; i += 1;
1358 1.1 oster syncNode = &nodes[i]; i += 1;
1359 1.1 oster termNode = &nodes[i]; i += 1;
1360 1.1 oster if (nfaults == 2) {
1361 1.1 oster wnqNode = &nodes[i]; i += 1;
1362 1.1 oster }
1363 1.1 oster else {
1364 1.1 oster wnqNode = NULL;
1365 1.1 oster }
1366 1.1 oster rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1367 1.1 oster if (nRodNodes > 0) {
1368 1.1 oster RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1369 1.1 oster }
1370 1.1 oster else {
1371 1.1 oster rodNodes = NULL;
1372 1.1 oster }
1373 1.1 oster
1374 1.1 oster /* begin node initialization */
1375 1.1 oster if (nRodNodes > 0) {
1376 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
1377 1.1 oster rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList);
1378 1.1 oster }
1379 1.1 oster else {
1380 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
1381 1.1 oster rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList);
1382 1.1 oster }
1383 1.1 oster
1384 1.1 oster rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
1385 1.1 oster
1386 1.1 oster /* initialize the Rod nodes */
1387 1.1 oster for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1388 1.1 oster if (new_asm_h[asmNum]) {
1389 1.1 oster pda = new_asm_h[asmNum]->stripeMap->physInfo;
1390 1.1 oster while (pda) {
1391 1.1 oster rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);
1392 1.1 oster rodNodes[nodeNum].params[0].p = pda;
1393 1.1 oster rodNodes[nodeNum].params[1].p = pda->bufPtr;
1394 1.1 oster rodNodes[nodeNum].params[2].v = parityStripeID;
1395 1.1 oster rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1396 1.1 oster nodeNum++;
1397 1.1 oster pda=pda->next;
1398 1.1 oster }
1399 1.1 oster }
1400 1.1 oster }
1401 1.1 oster RF_ASSERT(nodeNum == nRodNodes);
1402 1.1 oster
1403 1.1 oster /* initialize the wnd nodes */
1404 1.1 oster pda = asmap->physInfo;
1405 1.1 oster for (i=0; i < nWndNodes; i++) {
1406 1.1 oster rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1407 1.1 oster RF_ASSERT(pda != NULL);
1408 1.1 oster wndNodes[i].params[0].p = pda;
1409 1.1 oster wndNodes[i].params[1].p = pda->bufPtr;
1410 1.1 oster wndNodes[i].params[2].v = parityStripeID;
1411 1.1 oster wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1412 1.1 oster pda = pda->next;
1413 1.1 oster }
1414 1.1 oster
1415 1.1 oster /* initialize the redundancy node */
1416 1.1 oster rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
1417 1.1 oster xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1418 1.1 oster for (i=0; i < nWndNodes; i++) {
1419 1.1 oster xorNode->params[2*i+0] = wndNodes[i].params[0]; /* pda */
1420 1.1 oster xorNode->params[2*i+1] = wndNodes[i].params[1]; /* buf ptr */
1421 1.1 oster }
1422 1.1 oster for (i=0; i < nRodNodes; i++) {
1423 1.1 oster xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0]; /* pda */
1424 1.1 oster xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1]; /* buf ptr */
1425 1.1 oster }
1426 1.1 oster xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */
1427 1.1 oster
1428 1.1 oster /* look for an Rod node that reads a complete SU. If none, alloc a buffer to receive the parity info.
1429 1.1 oster * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
1430 1.1 oster */
1431 1.1 oster if (allowBufferRecycle) {
1432 1.1 oster for (i = 0; i < nRodNodes; i++)
1433 1.1 oster if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1434 1.1 oster break;
1435 1.1 oster }
1436 1.1 oster if ((!allowBufferRecycle) || (i == nRodNodes)) {
1437 1.1 oster RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1438 1.1 oster }
1439 1.1 oster else
1440 1.1 oster xorNode->results[0] = rodNodes[i].params[1].p;
1441 1.1 oster
1442 1.1 oster /* initialize the Wnp node */
1443 1.1 oster rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
1444 1.1 oster wnpNode->params[0].p = asmap->parityInfo;
1445 1.1 oster wnpNode->params[1].p = xorNode->results[0];
1446 1.1 oster wnpNode->params[2].v = parityStripeID;
1447 1.1 oster wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1448 1.1 oster RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must describe entire parity unit */
1449 1.1 oster
1450 1.1 oster if (nfaults == 2)
1451 1.1 oster {
1452 1.1 oster /* we never try to recycle a buffer for the Q calcuation in addition to the parity.
1453 1.1 oster This would cause two buffers to get smashed during the P and Q calculation,
1454 1.1 oster guaranteeing one would be wrong.
1455 1.1 oster */
1456 1.1 oster RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1457 1.1 oster rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
1458 1.1 oster wnqNode->params[0].p = asmap->qInfo;
1459 1.1 oster wnqNode->params[1].p = xorNode->results[1];
1460 1.1 oster wnqNode->params[2].v = parityStripeID;
1461 1.1 oster wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1462 1.1 oster RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must describe entire parity unit */
1463 1.1 oster }
1464 1.1 oster
1465 1.1 oster
1466 1.1 oster /* connect nodes to form graph */
1467 1.1 oster
1468 1.1 oster /* connect dag header to block node */
1469 1.1 oster RF_ASSERT(blockNode->numAntecedents == 0);
1470 1.1 oster dag_h->succedents[0] = blockNode;
1471 1.1 oster
1472 1.1 oster if (nRodNodes > 0) {
1473 1.1 oster /* connect the block node to the Rod nodes */
1474 1.1 oster RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1475 1.1 oster RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1476 1.1 oster for (i = 0; i < nRodNodes; i++) {
1477 1.1 oster RF_ASSERT(rodNodes[i].numAntecedents == 1);
1478 1.1 oster blockNode->succedents[i] = &rodNodes[i];
1479 1.1 oster rodNodes[i].antecedents[0] = blockNode;
1480 1.1 oster rodNodes[i].antType[0] = rf_control;
1481 1.1 oster
1482 1.1 oster /* connect the Rod nodes to the Nil node */
1483 1.1 oster RF_ASSERT(rodNodes[i].numSuccedents == 1);
1484 1.1 oster rodNodes[i].succedents[0] = syncNode;
1485 1.1 oster syncNode->antecedents[i] = &rodNodes[i];
1486 1.1 oster syncNode->antType[i] = rf_trueData;
1487 1.1 oster }
1488 1.1 oster }
1489 1.1 oster else {
1490 1.1 oster /* connect the block node to the Nil node */
1491 1.1 oster RF_ASSERT(blockNode->numSuccedents == 1);
1492 1.1 oster RF_ASSERT(syncNode->numAntecedents == 1);
1493 1.1 oster blockNode->succedents[0] = syncNode;
1494 1.1 oster syncNode->antecedents[0] = blockNode;
1495 1.1 oster syncNode->antType[0] = rf_control;
1496 1.1 oster }
1497 1.1 oster
1498 1.1 oster /* connect the sync node to the Wnd nodes */
1499 1.1 oster RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1500 1.1 oster for (i = 0; i < nWndNodes; i++) {
1501 1.1 oster RF_ASSERT(wndNodes->numAntecedents == 1);
1502 1.1 oster syncNode->succedents[i] = &wndNodes[i];
1503 1.1 oster wndNodes[i].antecedents[0] = syncNode;
1504 1.1 oster wndNodes[i].antType[0] = rf_control;
1505 1.1 oster }
1506 1.1 oster
1507 1.1 oster /* connect the sync node to the Xor node */
1508 1.1 oster RF_ASSERT(xorNode->numAntecedents == 1);
1509 1.1 oster syncNode->succedents[nWndNodes] = xorNode;
1510 1.1 oster xorNode->antecedents[0] = syncNode;
1511 1.1 oster xorNode->antType[0] = rf_control;
1512 1.1 oster
1513 1.1 oster /* connect the xor node to the write parity node */
1514 1.1 oster RF_ASSERT(xorNode->numSuccedents == nfaults);
1515 1.1 oster RF_ASSERT(wnpNode->numAntecedents == 1);
1516 1.1 oster xorNode->succedents[0] = wnpNode;
1517 1.1 oster wnpNode->antecedents[0]= xorNode;
1518 1.1 oster wnpNode->antType[0] = rf_trueData;
1519 1.1 oster if (nfaults == 2) {
1520 1.1 oster RF_ASSERT(wnqNode->numAntecedents == 1);
1521 1.1 oster xorNode->succedents[1] = wnqNode;
1522 1.1 oster wnqNode->antecedents[0] = xorNode;
1523 1.1 oster wnqNode->antType[0] = rf_trueData;
1524 1.1 oster }
1525 1.1 oster
1526 1.1 oster /* connect the write nodes to the term node */
1527 1.1 oster RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1528 1.1 oster RF_ASSERT(termNode->numSuccedents == 0);
1529 1.1 oster for (i = 0; i < nWndNodes; i++) {
1530 1.1 oster RF_ASSERT(wndNodes->numSuccedents == 1);
1531 1.1 oster wndNodes[i].succedents[0] = termNode;
1532 1.1 oster termNode->antecedents[i] = &wndNodes[i];
1533 1.1 oster termNode->antType[i] = rf_control;
1534 1.1 oster }
1535 1.1 oster RF_ASSERT(wnpNode->numSuccedents == 1);
1536 1.1 oster wnpNode->succedents[0] = termNode;
1537 1.1 oster termNode->antecedents[nWndNodes] = wnpNode;
1538 1.1 oster termNode->antType[nWndNodes] = rf_control;
1539 1.1 oster if (nfaults == 2) {
1540 1.1 oster RF_ASSERT(wnqNode->numSuccedents == 1);
1541 1.1 oster wnqNode->succedents[0] = termNode;
1542 1.1 oster termNode->antecedents[nWndNodes + 1] = wnqNode;
1543 1.1 oster termNode->antType[nWndNodes + 1] = rf_control;
1544 1.1 oster }
1545 1.1 oster }
1546 1.1 oster
1547 1.1 oster
1548 1.1 oster /******************************************************************************
1549 1.1 oster *
1550 1.1 oster * creates a DAG to perform a small-write operation (either raid 5 or pq),
1551 1.1 oster * which is as follows:
1552 1.1 oster *
1553 1.1 oster * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1554 1.1 oster * \- Rod X- Wnd [Und] -------/
1555 1.1 oster * [\- Rod X- Wnd [Und] ------/]
1556 1.1 oster * [\- Roq - Q --> Wnq [Unq]-/]
1557 1.1 oster *
1558 1.1 oster * Rop = read old parity
1559 1.1 oster * Rod = read old data
1560 1.1 oster * Roq = read old "q"
1561 1.1 oster * Cmt = commit node
1562 1.1 oster * Und = unlock data disk
1563 1.1 oster * Unp = unlock parity disk
1564 1.1 oster * Unq = unlock q disk
1565 1.1 oster * Wnp = write new parity
1566 1.1 oster * Wnd = write new data
1567 1.1 oster * Wnq = write new "q"
1568 1.1 oster * [ ] denotes optional segments in the graph
1569 1.1 oster *
1570 1.1 oster * Parameters: raidPtr - description of the physical array
1571 1.1 oster * asmap - logical & physical addresses for this access
1572 1.1 oster * bp - buffer ptr (holds write data)
1573 1.1 oster * flags - general flags (e.g. disk locking)
1574 1.1 oster * allocList - list of memory allocated in DAG creation
1575 1.1 oster * pfuncs - list of parity generating functions
1576 1.1 oster * qfuncs - list of q generating functions
1577 1.1 oster *
1578 1.1 oster * A null qfuncs indicates single fault tolerant
1579 1.1 oster *****************************************************************************/
1580 1.1 oster
1581 1.1 oster void rf_CommonCreateSmallWriteDAGFwd(
1582 1.1 oster RF_Raid_t *raidPtr,
1583 1.1 oster RF_AccessStripeMap_t *asmap,
1584 1.1 oster RF_DagHeader_t *dag_h,
1585 1.1 oster void *bp,
1586 1.1 oster RF_RaidAccessFlags_t flags,
1587 1.1 oster RF_AllocListElem_t *allocList,
1588 1.1 oster RF_RedFuncs_t *pfuncs,
1589 1.1 oster RF_RedFuncs_t *qfuncs)
1590 1.1 oster {
1591 1.1 oster RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1592 1.1 oster RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1593 1.1 oster RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1594 1.1 oster RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1595 1.1 oster int i, j, nNodes, totalNumNodes, lu_flag;
1596 1.1 oster RF_ReconUnitNum_t which_ru;
1597 1.1 oster int (*func)(RF_DagNode_t *), (*undoFunc)(RF_DagNode_t *);
1598 1.1 oster int (*qfunc)(RF_DagNode_t *);
1599 1.1 oster int numDataNodes, numParityNodes;
1600 1.1 oster RF_StripeNum_t parityStripeID;
1601 1.1 oster RF_PhysDiskAddr_t *pda;
1602 1.1 oster char *name, *qname;
1603 1.1 oster long nfaults;
1604 1.1 oster
1605 1.1 oster nfaults = qfuncs ? 2 : 1;
1606 1.1 oster lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
1607 1.1 oster
1608 1.1 oster parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1609 1.1 oster pda = asmap->physInfo;
1610 1.1 oster numDataNodes = asmap->numStripeUnitsAccessed;
1611 1.1 oster numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1612 1.1 oster
1613 1.1 oster if (rf_dagDebug) printf("[Creating small-write DAG]\n");
1614 1.1 oster RF_ASSERT(numDataNodes > 0);
1615 1.1 oster dag_h->creator = "SmallWriteDAGFwd";
1616 1.1 oster
1617 1.1 oster dag_h->numCommitNodes = 0;
1618 1.1 oster dag_h->numCommits = 0;
1619 1.1 oster dag_h->numSuccedents = 1;
1620 1.1 oster
1621 1.1 oster qfunc = NULL;
1622 1.1 oster qname = NULL;
1623 1.1 oster
1624 1.1 oster /* DAG creation occurs in four steps:
1625 1.1 oster 1. count the number of nodes in the DAG
1626 1.1 oster 2. create the nodes
1627 1.1 oster 3. initialize the nodes
1628 1.1 oster 4. connect the nodes
1629 1.1 oster */
1630 1.1 oster
1631 1.1 oster /* Step 1. compute number of nodes in the graph */
1632 1.1 oster
1633 1.1 oster /* number of nodes:
1634 1.1 oster a read and write for each data unit
1635 1.1 oster a redundancy computation node for each parity node (nfaults * nparity)
1636 1.1 oster a read and write for each parity unit
1637 1.1 oster a block node
1638 1.1 oster a terminate node
1639 1.1 oster if atomic RMW
1640 1.1 oster an unlock node for each data unit, redundancy unit
1641 1.1 oster */
1642 1.1 oster totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;
1643 1.1 oster if (lu_flag)
1644 1.1 oster totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1645 1.1 oster
1646 1.1 oster
1647 1.1 oster /* Step 2. create the nodes */
1648 1.1 oster RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1649 1.1 oster i = 0;
1650 1.1 oster blockNode = &nodes[i]; i += 1;
1651 1.1 oster readDataNodes = &nodes[i]; i += numDataNodes;
1652 1.1 oster readParityNodes = &nodes[i]; i += numParityNodes;
1653 1.1 oster writeDataNodes = &nodes[i]; i += numDataNodes;
1654 1.1 oster writeParityNodes = &nodes[i]; i += numParityNodes;
1655 1.1 oster xorNodes = &nodes[i]; i += numParityNodes;
1656 1.1 oster termNode = &nodes[i]; i += 1;
1657 1.1 oster if (lu_flag) {
1658 1.1 oster unlockDataNodes = &nodes[i]; i += numDataNodes;
1659 1.1 oster unlockParityNodes = &nodes[i]; i += numParityNodes;
1660 1.1 oster }
1661 1.1 oster else {
1662 1.1 oster unlockDataNodes = unlockParityNodes = NULL;
1663 1.1 oster }
1664 1.1 oster if (nfaults == 2) {
1665 1.1 oster readQNodes = &nodes[i]; i += numParityNodes;
1666 1.1 oster writeQNodes = &nodes[i]; i += numParityNodes;
1667 1.1 oster qNodes = &nodes[i]; i += numParityNodes;
1668 1.1 oster if (lu_flag) {
1669 1.1 oster unlockQNodes = &nodes[i]; i += numParityNodes;
1670 1.1 oster }
1671 1.1 oster else {
1672 1.1 oster unlockQNodes = NULL;
1673 1.1 oster }
1674 1.1 oster }
1675 1.1 oster else {
1676 1.1 oster readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1677 1.1 oster }
1678 1.1 oster RF_ASSERT(i == totalNumNodes);
1679 1.1 oster
1680 1.1 oster /* Step 3. initialize the nodes */
1681 1.1 oster /* initialize block node (Nil) */
1682 1.1 oster nNodes = numDataNodes + (nfaults * numParityNodes);
1683 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
1684 1.1 oster
1685 1.1 oster /* initialize terminate node (Trm) */
1686 1.1 oster rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
1687 1.1 oster
1688 1.1 oster /* initialize nodes which read old data (Rod) */
1689 1.1 oster for (i = 0; i < numDataNodes; i++) {
1690 1.1 oster rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);
1691 1.1 oster RF_ASSERT(pda != NULL);
1692 1.1 oster readDataNodes[i].params[0].p = pda; /* physical disk addr desc */
1693 1.1 oster readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old data */
1694 1.1 oster readDataNodes[i].params[2].v = parityStripeID;
1695 1.1 oster readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1696 1.1 oster pda=pda->next;
1697 1.1 oster for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1698 1.1 oster readDataNodes[i].propList[j] = NULL;
1699 1.1 oster }
1700 1.1 oster
1701 1.1 oster /* initialize nodes which read old parity (Rop) */
1702 1.1 oster pda = asmap->parityInfo; i = 0;
1703 1.1 oster for (i = 0; i < numParityNodes; i++) {
1704 1.1 oster RF_ASSERT(pda != NULL);
1705 1.1 oster rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1706 1.1 oster readParityNodes[i].params[0].p = pda;
1707 1.1 oster readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old parity */
1708 1.1 oster readParityNodes[i].params[2].v = parityStripeID;
1709 1.1 oster readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1710 1.1 oster for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1711 1.1 oster readParityNodes[i].propList[0] = NULL;
1712 1.1 oster pda=pda->next;
1713 1.1 oster }
1714 1.1 oster
1715 1.1 oster /* initialize nodes which read old Q (Roq) */
1716 1.1 oster if (nfaults == 2)
1717 1.1 oster {
1718 1.1 oster pda = asmap->qInfo;
1719 1.1 oster for (i = 0; i < numParityNodes; i++) {
1720 1.1 oster RF_ASSERT(pda != NULL);
1721 1.1 oster rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
1722 1.1 oster readQNodes[i].params[0].p = pda;
1723 1.1 oster readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */
1724 1.1 oster readQNodes[i].params[2].v = parityStripeID;
1725 1.1 oster readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1726 1.1 oster for (j = 0; j < readQNodes[i].numSuccedents; j++)
1727 1.1 oster readQNodes[i].propList[0] = NULL;
1728 1.1 oster pda=pda->next;
1729 1.1 oster }
1730 1.1 oster }
1731 1.1 oster
1732 1.1 oster /* initialize nodes which write new data (Wnd) */
1733 1.1 oster pda = asmap->physInfo;
1734 1.1 oster for (i=0; i < numDataNodes; i++) {
1735 1.1 oster RF_ASSERT(pda != NULL);
1736 1.1 oster rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1737 1.1 oster writeDataNodes[i].params[0].p = pda; /* physical disk addr desc */
1738 1.1 oster writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new data to be written */
1739 1.1 oster writeDataNodes[i].params[2].v = parityStripeID;
1740 1.1 oster writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1741 1.1 oster
1742 1.1 oster if (lu_flag) {
1743 1.1 oster /* initialize node to unlock the disk queue */
1744 1.1 oster rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
1745 1.1 oster unlockDataNodes[i].params[0].p = pda; /* physical disk addr desc */
1746 1.1 oster unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1747 1.1 oster }
1748 1.1 oster
1749 1.1 oster pda = pda->next;
1750 1.1 oster }
1751 1.1 oster
1752 1.1 oster
1753 1.1 oster /* initialize nodes which compute new parity and Q */
1754 1.1 oster /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
1755 1.1 oster * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
1756 1.1 oster * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
1757 1.1 oster * the simple XOR func just XORs the data into the start of the buffer.
1758 1.1 oster */
1759 1.1 oster if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
1760 1.1 oster func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
1761 1.1 oster if (qfuncs) {
1762 1.1 oster qfunc = qfuncs->simple;
1763 1.1 oster qname = qfuncs->SimpleName;
1764 1.1 oster }
1765 1.1 oster }
1766 1.1 oster else {
1767 1.1 oster func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
1768 1.1 oster if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
1769 1.1 oster }
1770 1.1 oster /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr */
1771 1.1 oster if (numParityNodes==2) { /* double-xor case */
1772 1.1 oster for (i=0; i < numParityNodes; i++) {
1773 1.1 oster rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for xor */
1774 1.1 oster xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1775 1.1 oster xorNodes[i].params[0] = readDataNodes[i].params[0];
1776 1.1 oster xorNodes[i].params[1] = readDataNodes[i].params[1];
1777 1.1 oster xorNodes[i].params[2] = readParityNodes[i].params[0];
1778 1.1 oster xorNodes[i].params[3] = readParityNodes[i].params[1];
1779 1.1 oster xorNodes[i].params[4] = writeDataNodes[i].params[0];
1780 1.1 oster xorNodes[i].params[5] = writeDataNodes[i].params[1];
1781 1.1 oster xorNodes[i].params[6].p = raidPtr;
1782 1.1 oster xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as target buf */
1783 1.1 oster if (nfaults==2)
1784 1.1 oster {
1785 1.1 oster rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for xor */
1786 1.1 oster qNodes[i].params[0] = readDataNodes[i].params[0];
1787 1.1 oster qNodes[i].params[1] = readDataNodes[i].params[1];
1788 1.1 oster qNodes[i].params[2] = readQNodes[i].params[0];
1789 1.1 oster qNodes[i].params[3] = readQNodes[i].params[1];
1790 1.1 oster qNodes[i].params[4] = writeDataNodes[i].params[0];
1791 1.1 oster qNodes[i].params[5] = writeDataNodes[i].params[1];
1792 1.1 oster qNodes[i].params[6].p = raidPtr;
1793 1.1 oster qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as target buf */
1794 1.1 oster }
1795 1.1 oster }
1796 1.1 oster }
1797 1.1 oster else {
1798 1.1 oster /* there is only one xor node in this case */
1799 1.1 oster rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
1800 1.1 oster xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1801 1.1 oster for (i=0; i < numDataNodes + 1; i++) {
1802 1.1 oster /* set up params related to Rod and Rop nodes */
1803 1.1 oster xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
1804 1.1 oster xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer pointer */
1805 1.1 oster }
1806 1.1 oster for (i=0; i < numDataNodes; i++) {
1807 1.1 oster /* set up params related to Wnd and Wnp nodes */
1808 1.1 oster xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
1809 1.1 oster xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
1810 1.1 oster }
1811 1.1 oster xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr; /* xor node needs to get at RAID information */
1812 1.1 oster xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1813 1.1 oster if (nfaults==2)
1814 1.1 oster {
1815 1.1 oster rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList);
1816 1.1 oster for (i=0; i<numDataNodes; i++) {
1817 1.1 oster /* set up params related to Rod */
1818 1.1 oster qNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
1819 1.1 oster qNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer pointer */
1820 1.1 oster }
1821 1.1 oster /* and read old q */
1822 1.1 oster qNodes[0].params[2*numDataNodes + 0] = readQNodes[0].params[0]; /* pda */
1823 1.1 oster qNodes[0].params[2*numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */
1824 1.1 oster for (i=0; i < numDataNodes; i++) {
1825 1.1 oster /* set up params related to Wnd nodes */
1826 1.1 oster qNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
1827 1.1 oster qNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
1828 1.1 oster }
1829 1.1 oster qNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr; /* xor node needs to get at RAID information */
1830 1.1 oster qNodes[0].results[0] = readQNodes[0].params[1].p;
1831 1.1 oster }
1832 1.1 oster }
1833 1.1 oster
1834 1.1 oster /* initialize nodes which write new parity (Wnp) */
1835 1.1 oster pda = asmap->parityInfo;
1836 1.1 oster for (i=0; i < numParityNodes; i++) {
1837 1.1 oster rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList);
1838 1.1 oster RF_ASSERT(pda != NULL);
1839 1.1 oster writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
1840 1.1 oster writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for parity write operation */
1841 1.1 oster writeParityNodes[i].params[2].v = parityStripeID;
1842 1.1 oster writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1843 1.1 oster
1844 1.1 oster if (lu_flag) {
1845 1.1 oster /* initialize node to unlock the disk queue */
1846 1.1 oster rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList);
1847 1.1 oster unlockParityNodes[i].params[0].p = pda; /* physical disk addr desc */
1848 1.1 oster unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1849 1.1 oster }
1850 1.1 oster
1851 1.1 oster pda = pda->next;
1852 1.1 oster }
1853 1.1 oster
1854 1.1 oster /* initialize nodes which write new Q (Wnq) */
1855 1.1 oster if (nfaults == 2)
1856 1.1 oster {
1857 1.1 oster pda = asmap->qInfo;
1858 1.1 oster for (i=0; i < numParityNodes; i++) {
1859 1.1 oster rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList);
1860 1.1 oster RF_ASSERT(pda != NULL);
1861 1.1 oster writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
1862 1.1 oster writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for parity write operation */
1863 1.1 oster writeQNodes[i].params[2].v = parityStripeID;
1864 1.1 oster writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1865 1.1 oster
1866 1.1 oster if (lu_flag) {
1867 1.1 oster /* initialize node to unlock the disk queue */
1868 1.1 oster rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList);
1869 1.1 oster unlockQNodes[i].params[0].p = pda; /* physical disk addr desc */
1870 1.1 oster unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1871 1.1 oster }
1872 1.1 oster
1873 1.1 oster pda = pda->next;
1874 1.1 oster }
1875 1.1 oster }
1876 1.1 oster
1877 1.1 oster /* Step 4. connect the nodes */
1878 1.1 oster
1879 1.1 oster /* connect header to block node */
1880 1.1 oster dag_h->succedents[0] = blockNode;
1881 1.1 oster
1882 1.1 oster /* connect block node to read old data nodes */
1883 1.1 oster RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
1884 1.1 oster for (i = 0; i < numDataNodes; i++) {
1885 1.1 oster blockNode->succedents[i] = &readDataNodes[i];
1886 1.1 oster RF_ASSERT(readDataNodes[i].numAntecedents == 1);
1887 1.1 oster readDataNodes[i].antecedents[0]= blockNode;
1888 1.1 oster readDataNodes[i].antType[0] = rf_control;
1889 1.1 oster }
1890 1.1 oster
1891 1.1 oster /* connect block node to read old parity nodes */
1892 1.1 oster for (i = 0; i < numParityNodes; i++) {
1893 1.1 oster blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
1894 1.1 oster RF_ASSERT(readParityNodes[i].numAntecedents == 1);
1895 1.1 oster readParityNodes[i].antecedents[0] = blockNode;
1896 1.1 oster readParityNodes[i].antType[0] = rf_control;
1897 1.1 oster }
1898 1.1 oster
1899 1.1 oster /* connect block node to read old Q nodes */
1900 1.1 oster if (nfaults == 2)
1901 1.1 oster for (i = 0; i < numParityNodes; i++) {
1902 1.1 oster blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
1903 1.1 oster RF_ASSERT(readQNodes[i].numAntecedents == 1);
1904 1.1 oster readQNodes[i].antecedents[0] = blockNode;
1905 1.1 oster readQNodes[i].antType[0] = rf_control;
1906 1.1 oster }
1907 1.1 oster
1908 1.1 oster /* connect read old data nodes to write new data nodes */
1909 1.1 oster for (i = 0; i < numDataNodes; i++) {
1910 1.1 oster RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1));
1911 1.1 oster RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1912 1.1 oster readDataNodes[i].succedents[0] = &writeDataNodes[i];
1913 1.1 oster writeDataNodes[i].antecedents[0] = &readDataNodes[i];
1914 1.1 oster writeDataNodes[i].antType[0] = rf_antiData;
1915 1.1 oster }
1916 1.1 oster
1917 1.1 oster /* connect read old data nodes to xor nodes */
1918 1.1 oster for (i = 0; i < numDataNodes; i++) {
1919 1.1 oster for (j = 0; j < numParityNodes; j++){
1920 1.1 oster RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
1921 1.1 oster readDataNodes[i].succedents[1 + j] = &xorNodes[j];
1922 1.1 oster xorNodes[j].antecedents[i] = &readDataNodes[i];
1923 1.1 oster xorNodes[j].antType[i] = rf_trueData;
1924 1.1 oster }
1925 1.1 oster }
1926 1.1 oster
1927 1.1 oster /* connect read old data nodes to q nodes */
1928 1.1 oster if (nfaults == 2)
1929 1.1 oster for (i = 0; i < numDataNodes; i++)
1930 1.1 oster for (j = 0; j < numParityNodes; j++){
1931 1.1 oster RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
1932 1.1 oster readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j];
1933 1.1 oster qNodes[j].antecedents[i] = &readDataNodes[i];
1934 1.1 oster qNodes[j].antType[i] = rf_trueData;
1935 1.1 oster }
1936 1.1 oster
1937 1.1 oster /* connect read old parity nodes to xor nodes */
1938 1.1 oster for (i = 0; i < numParityNodes; i++) {
1939 1.1 oster for (j = 0; j < numParityNodes; j++) {
1940 1.1 oster RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1941 1.1 oster readParityNodes[i].succedents[j] = &xorNodes[j];
1942 1.1 oster xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
1943 1.1 oster xorNodes[j].antType[numDataNodes + i] = rf_trueData;
1944 1.1 oster }
1945 1.1 oster }
1946 1.1 oster
1947 1.1 oster /* connect read old q nodes to q nodes */
1948 1.1 oster if (nfaults == 2)
1949 1.1 oster for (i = 0; i < numParityNodes; i++) {
1950 1.1 oster for (j = 0; j < numParityNodes; j++) {
1951 1.1 oster RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes);
1952 1.1 oster readQNodes[i].succedents[j] = &qNodes[j];
1953 1.1 oster qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
1954 1.1 oster qNodes[j].antType[numDataNodes + i] = rf_trueData;
1955 1.1 oster }
1956 1.1 oster }
1957 1.1 oster
1958 1.1 oster /* connect xor nodes to the write new parity nodes */
1959 1.1 oster for (i = 0; i < numParityNodes; i++) {
1960 1.1 oster RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
1961 1.1 oster for (j = 0; j < numParityNodes; j++) {
1962 1.1 oster RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
1963 1.1 oster xorNodes[i].succedents[j] = &writeParityNodes[j];
1964 1.1 oster writeParityNodes[j].antecedents[i] = &xorNodes[i];
1965 1.1 oster writeParityNodes[j].antType[i] = rf_trueData;
1966 1.1 oster }
1967 1.1 oster }
1968 1.1 oster
1969 1.1 oster /* connect q nodes to the write new q nodes */
1970 1.1 oster if (nfaults == 2)
1971 1.1 oster for (i = 0; i < numParityNodes; i++) {
1972 1.1 oster RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes);
1973 1.1 oster for (j = 0; j < numParityNodes; j++) {
1974 1.1 oster RF_ASSERT(qNodes[j].numSuccedents == 1);
1975 1.1 oster qNodes[i].succedents[j] = &writeQNodes[j];
1976 1.1 oster writeQNodes[j].antecedents[i] = &qNodes[i];
1977 1.1 oster writeQNodes[j].antType[i] = rf_trueData;
1978 1.1 oster }
1979 1.1 oster }
1980 1.1 oster
1981 1.1 oster RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1982 1.1 oster RF_ASSERT(termNode->numSuccedents == 0);
1983 1.1 oster for (i = 0; i < numDataNodes; i++) {
1984 1.1 oster if (lu_flag) {
1985 1.1 oster /* connect write new data nodes to unlock nodes */
1986 1.1 oster RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1987 1.1 oster RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1988 1.1 oster writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1989 1.1 oster unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1990 1.1 oster unlockDataNodes[i].antType[0] = rf_control;
1991 1.1 oster
1992 1.1 oster /* connect unlock nodes to term node */
1993 1.1 oster RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1994 1.1 oster unlockDataNodes[i].succedents[0] = termNode;
1995 1.1 oster termNode->antecedents[i] = &unlockDataNodes[i];
1996 1.1 oster termNode->antType[i] = rf_control;
1997 1.1 oster }
1998 1.1 oster else {
1999 1.1 oster /* connect write new data nodes to term node */
2000 1.1 oster RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2001 1.1 oster RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
2002 1.1 oster writeDataNodes[i].succedents[0] = termNode;
2003 1.1 oster termNode->antecedents[i] = &writeDataNodes[i];
2004 1.1 oster termNode->antType[i] = rf_control;
2005 1.1 oster }
2006 1.1 oster }
2007 1.1 oster
2008 1.1 oster for (i = 0; i < numParityNodes; i++) {
2009 1.1 oster if (lu_flag) {
2010 1.1 oster /* connect write new parity nodes to unlock nodes */
2011 1.1 oster RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2012 1.1 oster RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
2013 1.1 oster writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
2014 1.1 oster unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
2015 1.1 oster unlockParityNodes[i].antType[0] = rf_control;
2016 1.1 oster
2017 1.1 oster /* connect unlock nodes to term node */
2018 1.1 oster RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
2019 1.1 oster unlockParityNodes[i].succedents[0] = termNode;
2020 1.1 oster termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
2021 1.1 oster termNode->antType[numDataNodes + i] = rf_control;
2022 1.1 oster }
2023 1.1 oster else {
2024 1.1 oster RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2025 1.1 oster writeParityNodes[i].succedents[0] = termNode;
2026 1.1 oster termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
2027 1.1 oster termNode->antType[numDataNodes + i] = rf_control;
2028 1.1 oster }
2029 1.1 oster }
2030 1.1 oster
2031 1.1 oster if (nfaults == 2)
2032 1.1 oster for (i = 0; i < numParityNodes; i++) {
2033 1.1 oster if (lu_flag) {
2034 1.1 oster /* connect write new Q nodes to unlock nodes */
2035 1.1 oster RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2036 1.1 oster RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
2037 1.1 oster writeQNodes[i].succedents[0] = &unlockQNodes[i];
2038 1.1 oster unlockQNodes[i].antecedents[0] = &writeQNodes[i];
2039 1.1 oster unlockQNodes[i].antType[0] = rf_control;
2040 1.1 oster
2041 1.1 oster /* connect unlock nodes to unblock node */
2042 1.1 oster RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
2043 1.1 oster unlockQNodes[i].succedents[0] = termNode;
2044 1.1 oster termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
2045 1.1 oster termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
2046 1.1 oster }
2047 1.1 oster else {
2048 1.1 oster RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2049 1.1 oster writeQNodes[i].succedents[0] = termNode;
2050 1.1 oster termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
2051 1.1 oster termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
2052 1.1 oster }
2053 1.1 oster }
2054 1.1 oster }
2055 1.1 oster
2056 1.1 oster
2057 1.1 oster
2058 1.1 oster /******************************************************************************
2059 1.1 oster * create a write graph (fault-free or degraded) for RAID level 1
2060 1.1 oster *
2061 1.1 oster * Hdr Nil -> Wpd -> Nil -> Trm
2062 1.1 oster * Nil -> Wsd ->
2063 1.1 oster *
2064 1.1 oster * The "Wpd" node writes data to the primary copy in the mirror pair
2065 1.1 oster * The "Wsd" node writes data to the secondary copy in the mirror pair
2066 1.1 oster *
2067 1.1 oster * Parameters: raidPtr - description of the physical array
2068 1.1 oster * asmap - logical & physical addresses for this access
2069 1.1 oster * bp - buffer ptr (holds write data)
2070 1.1 oster * flags - general flags (e.g. disk locking)
2071 1.1 oster * allocList - list of memory allocated in DAG creation
2072 1.1 oster *****************************************************************************/
2073 1.1 oster
2074 1.1 oster void rf_CreateRaidOneWriteDAGFwd(
2075 1.1 oster RF_Raid_t *raidPtr,
2076 1.1 oster RF_AccessStripeMap_t *asmap,
2077 1.1 oster RF_DagHeader_t *dag_h,
2078 1.1 oster void *bp,
2079 1.1 oster RF_RaidAccessFlags_t flags,
2080 1.1 oster RF_AllocListElem_t *allocList)
2081 1.1 oster {
2082 1.1 oster RF_DagNode_t *blockNode, *unblockNode, *termNode;
2083 1.1 oster RF_DagNode_t *nodes, *wndNode, *wmirNode;
2084 1.1 oster int nWndNodes, nWmirNodes, i;
2085 1.1 oster RF_ReconUnitNum_t which_ru;
2086 1.1 oster RF_PhysDiskAddr_t *pda, *pdaP;
2087 1.1 oster RF_StripeNum_t parityStripeID;
2088 1.1 oster
2089 1.1 oster parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2090 1.1 oster asmap->raidAddress, &which_ru);
2091 1.1 oster if (rf_dagDebug) {
2092 1.1 oster printf("[Creating RAID level 1 write DAG]\n");
2093 1.1 oster }
2094 1.1 oster
2095 1.1 oster nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not SU aligned */
2096 1.1 oster nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2097 1.1 oster
2098 1.1 oster /* alloc the Wnd nodes and the Wmir node */
2099 1.1 oster if (asmap->numDataFailed == 1)
2100 1.1 oster nWndNodes--;
2101 1.1 oster if (asmap->numParityFailed == 1)
2102 1.1 oster nWmirNodes--;
2103 1.1 oster
2104 1.1 oster /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock + terminator) */
2105 1.1 oster RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2106 1.1 oster i = 0;
2107 1.1 oster wndNode = &nodes[i]; i += nWndNodes;
2108 1.1 oster wmirNode = &nodes[i]; i += nWmirNodes;
2109 1.1 oster blockNode = &nodes[i]; i += 1;
2110 1.1 oster unblockNode = &nodes[i]; i += 1;
2111 1.1 oster termNode = &nodes[i]; i += 1;
2112 1.1 oster RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2113 1.1 oster
2114 1.1 oster /* this dag can commit immediately */
2115 1.1 oster dag_h->numCommitNodes = 0;
2116 1.1 oster dag_h->numCommits = 0;
2117 1.1 oster dag_h->numSuccedents = 1;
2118 1.1 oster
2119 1.1 oster /* initialize the unblock and term nodes */
2120 1.1 oster rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList);
2121 1.1 oster rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
2122 1.1 oster rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2123 1.1 oster
2124 1.1 oster /* initialize the wnd nodes */
2125 1.1 oster if (nWndNodes > 0) {
2126 1.1 oster pda = asmap->physInfo;
2127 1.1 oster for (i = 0; i < nWndNodes; i++) {
2128 1.1 oster rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
2129 1.1 oster RF_ASSERT(pda != NULL);
2130 1.1 oster wndNode[i].params[0].p = pda;
2131 1.1 oster wndNode[i].params[1].p = pda->bufPtr;
2132 1.1 oster wndNode[i].params[2].v = parityStripeID;
2133 1.1 oster wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2134 1.1 oster pda = pda->next;
2135 1.1 oster }
2136 1.1 oster RF_ASSERT(pda == NULL);
2137 1.1 oster }
2138 1.1 oster
2139 1.1 oster /* initialize the mirror nodes */
2140 1.1 oster if (nWmirNodes > 0) {
2141 1.1 oster pda = asmap->physInfo;
2142 1.1 oster pdaP = asmap->parityInfo;
2143 1.1 oster for (i = 0; i < nWmirNodes; i++) {
2144 1.1 oster rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
2145 1.1 oster RF_ASSERT(pda != NULL);
2146 1.1 oster wmirNode[i].params[0].p = pdaP;
2147 1.1 oster wmirNode[i].params[1].p = pda->bufPtr;
2148 1.1 oster wmirNode[i].params[2].v = parityStripeID;
2149 1.1 oster wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2150 1.1 oster pda = pda->next;
2151 1.1 oster pdaP = pdaP->next;
2152 1.1 oster }
2153 1.1 oster RF_ASSERT(pda == NULL);
2154 1.1 oster RF_ASSERT(pdaP == NULL);
2155 1.1 oster }
2156 1.1 oster
2157 1.1 oster /* link the header node to the block node */
2158 1.1 oster RF_ASSERT(dag_h->numSuccedents == 1);
2159 1.1 oster RF_ASSERT(blockNode->numAntecedents == 0);
2160 1.1 oster dag_h->succedents[0] = blockNode;
2161 1.1 oster
2162 1.1 oster /* link the block node to the write nodes */
2163 1.1 oster RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2164 1.1 oster for (i = 0; i < nWndNodes; i++) {
2165 1.1 oster RF_ASSERT(wndNode[i].numAntecedents == 1);
2166 1.1 oster blockNode->succedents[i] = &wndNode[i];
2167 1.1 oster wndNode[i].antecedents[0] = blockNode;
2168 1.1 oster wndNode[i].antType[0] = rf_control;
2169 1.1 oster }
2170 1.1 oster for (i = 0; i < nWmirNodes; i++) {
2171 1.1 oster RF_ASSERT(wmirNode[i].numAntecedents == 1);
2172 1.1 oster blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2173 1.1 oster wmirNode[i].antecedents[0] = blockNode;
2174 1.1 oster wmirNode[i].antType[0] = rf_control;
2175 1.1 oster }
2176 1.1 oster
2177 1.1 oster /* link the write nodes to the unblock node */
2178 1.1 oster RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2179 1.1 oster for (i = 0; i < nWndNodes; i++) {
2180 1.1 oster RF_ASSERT(wndNode[i].numSuccedents == 1);
2181 1.1 oster wndNode[i].succedents[0] = unblockNode;
2182 1.1 oster unblockNode->antecedents[i] = &wndNode[i];
2183 1.1 oster unblockNode->antType[i] = rf_control;
2184 1.1 oster }
2185 1.1 oster for (i = 0; i < nWmirNodes; i++) {
2186 1.1 oster RF_ASSERT(wmirNode[i].numSuccedents == 1);
2187 1.1 oster wmirNode[i].succedents[0] = unblockNode;
2188 1.1 oster unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2189 1.1 oster unblockNode->antType[i + nWndNodes] = rf_control;
2190 1.1 oster }
2191 1.1 oster
2192 1.1 oster /* link the unblock node to the term node */
2193 1.1 oster RF_ASSERT(unblockNode->numSuccedents == 1);
2194 1.1 oster RF_ASSERT(termNode->numAntecedents == 1);
2195 1.1 oster RF_ASSERT(termNode->numSuccedents == 0);
2196 1.1 oster unblockNode->succedents[0] = termNode;
2197 1.1 oster termNode->antecedents[0] = unblockNode;
2198 1.1 oster termNode->antType[0] = rf_control;
2199 1.1 oster
2200 1.1 oster return;
2201 1.1 oster }
2202