Home | History | Annotate | Line # | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
     23  * Portions Copyright 2011 Martin Matuska <mm (at) FreeBSD.org>
     24  * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
     25  */
     26 
     27 #include <sys/zfs_context.h>
     28 #include <sys/txg_impl.h>
     29 #include <sys/dmu_impl.h>
     30 #include <sys/dmu_tx.h>
     31 #include <sys/dsl_pool.h>
     32 #include <sys/dsl_scan.h>
     33 #include <sys/callb.h>
     34 
     35 /*
     36  * ZFS Transaction Groups
     37  * ----------------------
     38  *
     39  * ZFS transaction groups are, as the name implies, groups of transactions
     40  * that act on persistent state. ZFS asserts consistency at the granularity of
     41  * these transaction groups. Each successive transaction group (txg) is
     42  * assigned a 64-bit consecutive identifier. There are three active
     43  * transaction group states: open, quiescing, or syncing. At any given time,
     44  * there may be an active txg associated with each state; each active txg may
     45  * either be processing, or blocked waiting to enter the next state. There may
     46  * be up to three active txgs, and there is always a txg in the open state
     47  * (though it may be blocked waiting to enter the quiescing state). In broad
     48  * strokes, transactions -- operations that change in-memory structures -- are
     49  * accepted into the txg in the open state, and are completed while the txg is
     50  * in the open or quiescing states. The accumulated changes are written to
     51  * disk in the syncing state.
     52  *
     53  * Open
     54  *
     55  * When a new txg becomes active, it first enters the open state. New
     56  * transactions -- updates to in-memory structures -- are assigned to the
     57  * currently open txg. There is always a txg in the open state so that ZFS can
     58  * accept new changes (though the txg may refuse new changes if it has hit
     59  * some limit). ZFS advances the open txg to the next state for a variety of
     60  * reasons such as it hitting a time or size threshold, or the execution of an
     61  * administrative action that must be completed in the syncing state.
     62  *
     63  * Quiescing
     64  *
     65  * After a txg exits the open state, it enters the quiescing state. The
     66  * quiescing state is intended to provide a buffer between accepting new
     67  * transactions in the open state and writing them out to stable storage in
     68  * the syncing state. While quiescing, transactions can continue their
     69  * operation without delaying either of the other states. Typically, a txg is
     70  * in the quiescing state very briefly since the operations are bounded by
     71  * software latencies rather than, say, slower I/O latencies. After all
     72  * transactions complete, the txg is ready to enter the next state.
     73  *
     74  * Syncing
     75  *
     76  * In the syncing state, the in-memory state built up during the open and (to
     77  * a lesser degree) the quiescing states is written to stable storage. The
     78  * process of writing out modified data can, in turn modify more data. For
     79  * example when we write new blocks, we need to allocate space for them; those
     80  * allocations modify metadata (space maps)... which themselves must be
     81  * written to stable storage. During the sync state, ZFS iterates, writing out
     82  * data until it converges and all in-memory changes have been written out.
     83  * The first such pass is the largest as it encompasses all the modified user
     84  * data (as opposed to filesystem metadata). Subsequent passes typically have
     85  * far less data to write as they consist exclusively of filesystem metadata.
     86  *
     87  * To ensure convergence, after a certain number of passes ZFS begins
     88  * overwriting locations on stable storage that had been allocated earlier in
     89  * the syncing state (and subsequently freed). ZFS usually allocates new
     90  * blocks to optimize for large, continuous, writes. For the syncing state to
     91  * converge however it must complete a pass where no new blocks are allocated
     92  * since each allocation requires a modification of persistent metadata.
     93  * Further, to hasten convergence, after a prescribed number of passes, ZFS
     94  * also defers frees, and stops compressing.
     95  *
     96  * In addition to writing out user data, we must also execute synctasks during
     97  * the syncing context. A synctask is the mechanism by which some
     98  * administrative activities work such as creating and destroying snapshots or
     99  * datasets. Note that when a synctask is initiated it enters the open txg,
    100  * and ZFS then pushes that txg as quickly as possible to completion of the
    101  * syncing state in order to reduce the latency of the administrative
    102  * activity. To complete the syncing state, ZFS writes out a new uberblock,
    103  * the root of the tree of blocks that comprise all state stored on the ZFS
    104  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
    105  * now transition to the syncing state.
    106  */
    107 
    108 static void txg_sync_thread(void *arg);
    109 static void txg_quiesce_thread(void *arg);
    110 
    111 int zfs_txg_timeout = 5;	/* max seconds worth of delta per txg */
    112 
    113 SYSCTL_DECL(_vfs_zfs);
    114 SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG");
    115 SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RWTUN, &zfs_txg_timeout, 0,
    116     "Maximum seconds worth of delta per txg");
    117 
    118 /*
    119  * Prepare the txg subsystem.
    120  */
    121 void
    122 txg_init(dsl_pool_t *dp, uint64_t txg)
    123 {
    124 	tx_state_t *tx = &dp->dp_tx;
    125 	int c;
    126 	bzero(tx, sizeof (tx_state_t));
    127 
    128 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
    129 
    130 	for (c = 0; c < max_ncpus; c++) {
    131 		int i;
    132 
    133 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
    134 		mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT,
    135 		    NULL);
    136 		for (i = 0; i < TXG_SIZE; i++) {
    137 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
    138 			    NULL);
    139 			list_create(&tx->tx_cpu[c].tc_callbacks[i],
    140 			    sizeof (dmu_tx_callback_t),
    141 			    offsetof(dmu_tx_callback_t, dcb_node));
    142 		}
    143 	}
    144 
    145 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
    146 
    147 	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
    148 	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
    149 	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
    150 	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
    151 	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
    152 
    153 	tx->tx_open_txg = txg;
    154 }
    155 
    156 /*
    157  * Close down the txg subsystem.
    158  */
    159 void
    160 txg_fini(dsl_pool_t *dp)
    161 {
    162 	tx_state_t *tx = &dp->dp_tx;
    163 	int c;
    164 
    165 	ASSERT(tx->tx_threads == 0);
    166 
    167 	mutex_destroy(&tx->tx_sync_lock);
    168 
    169 	cv_destroy(&tx->tx_sync_more_cv);
    170 	cv_destroy(&tx->tx_sync_done_cv);
    171 	cv_destroy(&tx->tx_quiesce_more_cv);
    172 	cv_destroy(&tx->tx_quiesce_done_cv);
    173 	cv_destroy(&tx->tx_exit_cv);
    174 
    175 	for (c = 0; c < max_ncpus; c++) {
    176 		int i;
    177 
    178 		mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
    179 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
    180 		for (i = 0; i < TXG_SIZE; i++) {
    181 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
    182 			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
    183 		}
    184 	}
    185 
    186 	if (tx->tx_commit_cb_taskq != NULL)
    187 		taskq_destroy(tx->tx_commit_cb_taskq);
    188 
    189 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
    190 
    191 	bzero(tx, sizeof (tx_state_t));
    192 }
    193 
    194 /*
    195  * Start syncing transaction groups.
    196  */
    197 void
    198 txg_sync_start(dsl_pool_t *dp)
    199 {
    200 	tx_state_t *tx = &dp->dp_tx;
    201 
    202 	mutex_enter(&tx->tx_sync_lock);
    203 
    204 	dprintf("pool %p\n", dp);
    205 
    206 	ASSERT(tx->tx_threads == 0);
    207 
    208 	tx->tx_threads = 2;
    209 
    210 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
    211 	    dp, 0, &p0, TS_RUN, minclsyspri);
    212 
    213 	/*
    214 	 * The sync thread can need a larger-than-default stack size on
    215 	 * 32-bit x86.  This is due in part to nested pools and
    216 	 * scrub_visitbp() recursion.
    217 	 */
    218 	tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
    219 	    dp, 0, &p0, TS_RUN, minclsyspri);
    220 
    221 	mutex_exit(&tx->tx_sync_lock);
    222 }
    223 
    224 static void
    225 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
    226 {
    227 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
    228 	mutex_enter(&tx->tx_sync_lock);
    229 }
    230 
    231 static void
    232 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
    233 {
    234 	ASSERT(*tpp != NULL);
    235 	*tpp = NULL;
    236 	tx->tx_threads--;
    237 	cv_broadcast(&tx->tx_exit_cv);
    238 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
    239 	thread_exit();
    240 }
    241 
    242 static void
    243 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
    244 {
    245 	CALLB_CPR_SAFE_BEGIN(cpr);
    246 
    247 	if (time)
    248 		(void) cv_timedwait(cv, &tx->tx_sync_lock, time);
    249 	else
    250 		cv_wait(cv, &tx->tx_sync_lock);
    251 
    252 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
    253 }
    254 
    255 /*
    256  * Stop syncing transaction groups.
    257  */
    258 void
    259 txg_sync_stop(dsl_pool_t *dp)
    260 {
    261 	tx_state_t *tx = &dp->dp_tx;
    262 
    263 	dprintf("pool %p\n", dp);
    264 	/*
    265 	 * Finish off any work in progress.
    266 	 */
    267 	ASSERT(tx->tx_threads == 2);
    268 
    269 	/*
    270 	 * We need to ensure that we've vacated the deferred space_maps.
    271 	 */
    272 	txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
    273 
    274 	/*
    275 	 * Wake all sync threads and wait for them to die.
    276 	 */
    277 	mutex_enter(&tx->tx_sync_lock);
    278 
    279 	ASSERT(tx->tx_threads == 2);
    280 
    281 	tx->tx_exiting = 1;
    282 
    283 	cv_broadcast(&tx->tx_quiesce_more_cv);
    284 	cv_broadcast(&tx->tx_quiesce_done_cv);
    285 	cv_broadcast(&tx->tx_sync_more_cv);
    286 
    287 	while (tx->tx_threads != 0)
    288 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
    289 
    290 	tx->tx_exiting = 0;
    291 
    292 	mutex_exit(&tx->tx_sync_lock);
    293 }
    294 
    295 uint64_t
    296 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
    297 {
    298 	tx_state_t *tx = &dp->dp_tx;
    299 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
    300 	uint64_t txg;
    301 
    302 	mutex_enter(&tc->tc_open_lock);
    303 	txg = tx->tx_open_txg;
    304 
    305 	mutex_enter(&tc->tc_lock);
    306 	tc->tc_count[txg & TXG_MASK]++;
    307 	mutex_exit(&tc->tc_lock);
    308 
    309 	th->th_cpu = tc;
    310 	th->th_txg = txg;
    311 
    312 	return (txg);
    313 }
    314 
    315 void
    316 txg_rele_to_quiesce(txg_handle_t *th)
    317 {
    318 	tx_cpu_t *tc = th->th_cpu;
    319 
    320 	ASSERT(!MUTEX_HELD(&tc->tc_lock));
    321 	mutex_exit(&tc->tc_open_lock);
    322 }
    323 
    324 void
    325 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
    326 {
    327 	tx_cpu_t *tc = th->th_cpu;
    328 	int g = th->th_txg & TXG_MASK;
    329 
    330 	mutex_enter(&tc->tc_lock);
    331 	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
    332 	mutex_exit(&tc->tc_lock);
    333 }
    334 
    335 void
    336 txg_rele_to_sync(txg_handle_t *th)
    337 {
    338 	tx_cpu_t *tc = th->th_cpu;
    339 	int g = th->th_txg & TXG_MASK;
    340 
    341 	mutex_enter(&tc->tc_lock);
    342 	ASSERT(tc->tc_count[g] != 0);
    343 	if (--tc->tc_count[g] == 0)
    344 		cv_broadcast(&tc->tc_cv[g]);
    345 	mutex_exit(&tc->tc_lock);
    346 
    347 	th->th_cpu = NULL;	/* defensive */
    348 }
    349 
    350 /*
    351  * Blocks until all transactions in the group are committed.
    352  *
    353  * On return, the transaction group has reached a stable state in which it can
    354  * then be passed off to the syncing context.
    355  */
    356 static __noinline void
    357 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
    358 {
    359 	tx_state_t *tx = &dp->dp_tx;
    360 	int g = txg & TXG_MASK;
    361 	int c;
    362 
    363 	/*
    364 	 * Grab all tc_open_locks so nobody else can get into this txg.
    365 	 */
    366 	for (c = 0; c < max_ncpus; c++)
    367 		mutex_enter(&tx->tx_cpu[c].tc_open_lock);
    368 
    369 	ASSERT(txg == tx->tx_open_txg);
    370 	tx->tx_open_txg++;
    371 	tx->tx_open_time = gethrtime();
    372 
    373 	DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
    374 	DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
    375 
    376 	/*
    377 	 * Now that we've incremented tx_open_txg, we can let threads
    378 	 * enter the next transaction group.
    379 	 */
    380 	for (c = 0; c < max_ncpus; c++)
    381 		mutex_exit(&tx->tx_cpu[c].tc_open_lock);
    382 
    383 	/*
    384 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
    385 	 */
    386 	for (c = 0; c < max_ncpus; c++) {
    387 		tx_cpu_t *tc = &tx->tx_cpu[c];
    388 		mutex_enter(&tc->tc_lock);
    389 		while (tc->tc_count[g] != 0)
    390 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
    391 		mutex_exit(&tc->tc_lock);
    392 	}
    393 }
    394 
    395 static void
    396 txg_do_callbacks(void *arg)
    397 {
    398 	list_t *cb_list = arg;
    399 
    400 	dmu_tx_do_callbacks(cb_list, 0);
    401 
    402 	list_destroy(cb_list);
    403 
    404 	kmem_free(cb_list, sizeof (list_t));
    405 }
    406 
    407 /*
    408  * Dispatch the commit callbacks registered on this txg to worker threads.
    409  *
    410  * If no callbacks are registered for a given TXG, nothing happens.
    411  * This function creates a taskq for the associated pool, if needed.
    412  */
    413 static void
    414 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
    415 {
    416 	int c;
    417 	tx_state_t *tx = &dp->dp_tx;
    418 	list_t *cb_list;
    419 
    420 	for (c = 0; c < max_ncpus; c++) {
    421 		tx_cpu_t *tc = &tx->tx_cpu[c];
    422 		/*
    423 		 * No need to lock tx_cpu_t at this point, since this can
    424 		 * only be called once a txg has been synced.
    425 		 */
    426 
    427 		int g = txg & TXG_MASK;
    428 
    429 		if (list_is_empty(&tc->tc_callbacks[g]))
    430 			continue;
    431 
    432 		if (tx->tx_commit_cb_taskq == NULL) {
    433 			/*
    434 			 * Commit callback taskq hasn't been created yet.
    435 			 */
    436 			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
    437 			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
    438 			    TASKQ_PREPOPULATE);
    439 		}
    440 
    441 		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
    442 		list_create(cb_list, sizeof (dmu_tx_callback_t),
    443 		    offsetof(dmu_tx_callback_t, dcb_node));
    444 
    445 		list_move_tail(cb_list, &tc->tc_callbacks[g]);
    446 
    447 		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
    448 		    txg_do_callbacks, cb_list, TQ_SLEEP);
    449 	}
    450 }
    451 
    452 static void
    453 txg_sync_thread(void *arg)
    454 {
    455 	dsl_pool_t *dp = arg;
    456 	spa_t *spa = dp->dp_spa;
    457 	tx_state_t *tx = &dp->dp_tx;
    458 	callb_cpr_t cpr;
    459 	uint64_t start, delta;
    460 
    461 	txg_thread_enter(tx, &cpr);
    462 
    463 	start = delta = 0;
    464 	for (;;) {
    465 		uint64_t timeout = zfs_txg_timeout * hz;
    466 		uint64_t timer;
    467 		uint64_t txg;
    468 
    469 		/*
    470 		 * We sync when we're scanning, there's someone waiting
    471 		 * on us, or the quiesce thread has handed off a txg to
    472 		 * us, or we have reached our timeout.
    473 		 */
    474 		timer = (delta >= timeout ? 0 : timeout - delta);
    475 		while (!dsl_scan_active(dp->dp_scan) &&
    476 		    !tx->tx_exiting && timer > 0 &&
    477 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
    478 		    tx->tx_quiesced_txg == 0 &&
    479 		    dp->dp_dirty_total < zfs_dirty_data_sync) {
    480 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
    481 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    482 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
    483 			delta = ddi_get_lbolt() - start;
    484 			timer = (delta > timeout ? 0 : timeout - delta);
    485 		}
    486 
    487 		/*
    488 		 * Wait until the quiesce thread hands off a txg to us,
    489 		 * prompting it to do so if necessary.
    490 		 */
    491 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
    492 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
    493 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
    494 			cv_broadcast(&tx->tx_quiesce_more_cv);
    495 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
    496 		}
    497 
    498 		if (tx->tx_exiting)
    499 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
    500 
    501 		/*
    502 		 * Consume the quiesced txg which has been handed off to
    503 		 * us.  This may cause the quiescing thread to now be
    504 		 * able to quiesce another txg, so we must signal it.
    505 		 */
    506 		txg = tx->tx_quiesced_txg;
    507 		tx->tx_quiesced_txg = 0;
    508 		tx->tx_syncing_txg = txg;
    509 		DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
    510 		cv_broadcast(&tx->tx_quiesce_more_cv);
    511 
    512 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    513 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    514 		mutex_exit(&tx->tx_sync_lock);
    515 
    516 		start = ddi_get_lbolt();
    517 		spa_sync(spa, txg);
    518 		delta = ddi_get_lbolt() - start;
    519 
    520 		mutex_enter(&tx->tx_sync_lock);
    521 		tx->tx_synced_txg = txg;
    522 		tx->tx_syncing_txg = 0;
    523 		DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
    524 		cv_broadcast(&tx->tx_sync_done_cv);
    525 
    526 		/*
    527 		 * Dispatch commit callbacks to worker threads.
    528 		 */
    529 		txg_dispatch_callbacks(dp, txg);
    530 	}
    531 }
    532 
    533 static void
    534 txg_quiesce_thread(void *arg)
    535 {
    536 	dsl_pool_t *dp = arg;
    537 	tx_state_t *tx = &dp->dp_tx;
    538 	callb_cpr_t cpr;
    539 
    540 	txg_thread_enter(tx, &cpr);
    541 
    542 	for (;;) {
    543 		uint64_t txg;
    544 
    545 		/*
    546 		 * We quiesce when there's someone waiting on us.
    547 		 * However, we can only have one txg in "quiescing" or
    548 		 * "quiesced, waiting to sync" state.  So we wait until
    549 		 * the "quiesced, waiting to sync" txg has been consumed
    550 		 * by the sync thread.
    551 		 */
    552 		while (!tx->tx_exiting &&
    553 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
    554 		    tx->tx_quiesced_txg != 0))
    555 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
    556 
    557 		if (tx->tx_exiting)
    558 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
    559 
    560 		txg = tx->tx_open_txg;
    561 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    562 		    txg, tx->tx_quiesce_txg_waiting,
    563 		    tx->tx_sync_txg_waiting);
    564 		mutex_exit(&tx->tx_sync_lock);
    565 		txg_quiesce(dp, txg);
    566 		mutex_enter(&tx->tx_sync_lock);
    567 
    568 		/*
    569 		 * Hand this txg off to the sync thread.
    570 		 */
    571 		dprintf("quiesce done, handing off txg %llu\n", txg);
    572 		tx->tx_quiesced_txg = txg;
    573 		DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
    574 		cv_broadcast(&tx->tx_sync_more_cv);
    575 		cv_broadcast(&tx->tx_quiesce_done_cv);
    576 	}
    577 }
    578 
    579 /*
    580  * Delay this thread by delay nanoseconds if we are still in the open
    581  * transaction group and there is already a waiting txg quiesing or quiesced.
    582  * Abort the delay if this txg stalls or enters the quiesing state.
    583  */
    584 void
    585 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
    586 {
    587 	tx_state_t *tx = &dp->dp_tx;
    588 	hrtime_t start = gethrtime();
    589 
    590 	/* don't delay if this txg could transition to quiescing immediately */
    591 	if (tx->tx_open_txg > txg ||
    592 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
    593 		return;
    594 
    595 	mutex_enter(&tx->tx_sync_lock);
    596 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
    597 		mutex_exit(&tx->tx_sync_lock);
    598 		return;
    599 	}
    600 
    601 	while (gethrtime() - start < delay &&
    602 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
    603 		(void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
    604 		    &tx->tx_sync_lock, delay, resolution, 0);
    605 	}
    606 
    607 	mutex_exit(&tx->tx_sync_lock);
    608 }
    609 
    610 void
    611 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
    612 {
    613 	tx_state_t *tx = &dp->dp_tx;
    614 
    615 	ASSERT(!dsl_pool_config_held(dp));
    616 
    617 	mutex_enter(&tx->tx_sync_lock);
    618 	ASSERT(tx->tx_threads == 2);
    619 	if (txg == 0)
    620 		txg = tx->tx_open_txg + TXG_DEFER_SIZE;
    621 	if (tx->tx_sync_txg_waiting < txg)
    622 		tx->tx_sync_txg_waiting = txg;
    623 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    624 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    625 	while (tx->tx_synced_txg < txg) {
    626 		dprintf("broadcasting sync more "
    627 		    "tx_synced=%llu waiting=%llu dp=%p\n",
    628 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    629 		cv_broadcast(&tx->tx_sync_more_cv);
    630 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
    631 	}
    632 	mutex_exit(&tx->tx_sync_lock);
    633 }
    634 
    635 void
    636 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
    637 {
    638 	tx_state_t *tx = &dp->dp_tx;
    639 
    640 	ASSERT(!dsl_pool_config_held(dp));
    641 
    642 	mutex_enter(&tx->tx_sync_lock);
    643 	ASSERT(tx->tx_threads == 2);
    644 	if (txg == 0)
    645 		txg = tx->tx_open_txg + 1;
    646 	if (tx->tx_quiesce_txg_waiting < txg)
    647 		tx->tx_quiesce_txg_waiting = txg;
    648 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    649 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    650 	while (tx->tx_open_txg < txg) {
    651 		cv_broadcast(&tx->tx_quiesce_more_cv);
    652 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
    653 	}
    654 	mutex_exit(&tx->tx_sync_lock);
    655 }
    656 
    657 /*
    658  * If there isn't a txg syncing or in the pipeline, push another txg through
    659  * the pipeline by queiscing the open txg.
    660  */
    661 void
    662 txg_kick(dsl_pool_t *dp)
    663 {
    664 	tx_state_t *tx = &dp->dp_tx;
    665 
    666 	ASSERT(!dsl_pool_config_held(dp));
    667 
    668 	mutex_enter(&tx->tx_sync_lock);
    669 	if (tx->tx_syncing_txg == 0 &&
    670 	    tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
    671 	    tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
    672 	    tx->tx_quiesced_txg <= tx->tx_synced_txg) {
    673 		tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
    674 		cv_broadcast(&tx->tx_quiesce_more_cv);
    675 	}
    676 	mutex_exit(&tx->tx_sync_lock);
    677 }
    678 
    679 boolean_t
    680 txg_stalled(dsl_pool_t *dp)
    681 {
    682 	tx_state_t *tx = &dp->dp_tx;
    683 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
    684 }
    685 
    686 boolean_t
    687 txg_sync_waiting(dsl_pool_t *dp)
    688 {
    689 	tx_state_t *tx = &dp->dp_tx;
    690 
    691 	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
    692 	    tx->tx_quiesced_txg != 0);
    693 }
    694 
    695 /*
    696  * Per-txg object lists.
    697  */
    698 void
    699 txg_list_create(txg_list_t *tl, size_t offset)
    700 {
    701 	int t;
    702 
    703 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
    704 
    705 	tl->tl_offset = offset;
    706 
    707 	for (t = 0; t < TXG_SIZE; t++)
    708 		tl->tl_head[t] = NULL;
    709 }
    710 
    711 void
    712 txg_list_destroy(txg_list_t *tl)
    713 {
    714 	int t;
    715 
    716 	for (t = 0; t < TXG_SIZE; t++)
    717 		ASSERT(txg_list_empty(tl, t));
    718 
    719 	mutex_destroy(&tl->tl_lock);
    720 }
    721 
    722 boolean_t
    723 txg_list_empty(txg_list_t *tl, uint64_t txg)
    724 {
    725 	return (tl->tl_head[txg & TXG_MASK] == NULL);
    726 }
    727 
    728 /*
    729  * Returns true if all txg lists are empty.
    730  *
    731  * Warning: this is inherently racy (an item could be added immediately after this
    732  * function returns). We don't bother with the lock because it wouldn't change the
    733  * semantics.
    734  */
    735 boolean_t
    736 txg_all_lists_empty(txg_list_t *tl)
    737 {
    738 	for (int i = 0; i < TXG_SIZE; i++) {
    739 		if (!txg_list_empty(tl, i)) {
    740 			return (B_FALSE);
    741 		}
    742 	}
    743 	return (B_TRUE);
    744 }
    745 
    746 /*
    747  * Add an entry to the list (unless it's already on the list).
    748  * Returns B_TRUE if it was actually added.
    749  */
    750 boolean_t
    751 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
    752 {
    753 	int t = txg & TXG_MASK;
    754 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    755 	boolean_t add;
    756 
    757 	mutex_enter(&tl->tl_lock);
    758 	add = (tn->tn_member[t] == 0);
    759 	if (add) {
    760 		tn->tn_member[t] = 1;
    761 		tn->tn_next[t] = tl->tl_head[t];
    762 		tl->tl_head[t] = tn;
    763 	}
    764 	mutex_exit(&tl->tl_lock);
    765 
    766 	return (add);
    767 }
    768 
    769 /*
    770  * Add an entry to the end of the list, unless it's already on the list.
    771  * (walks list to find end)
    772  * Returns B_TRUE if it was actually added.
    773  */
    774 boolean_t
    775 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
    776 {
    777 	int t = txg & TXG_MASK;
    778 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    779 	boolean_t add;
    780 
    781 	mutex_enter(&tl->tl_lock);
    782 	add = (tn->tn_member[t] == 0);
    783 	if (add) {
    784 		txg_node_t **tp;
    785 
    786 		for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
    787 			continue;
    788 
    789 		tn->tn_member[t] = 1;
    790 		tn->tn_next[t] = NULL;
    791 		*tp = tn;
    792 	}
    793 	mutex_exit(&tl->tl_lock);
    794 
    795 	return (add);
    796 }
    797 
    798 /*
    799  * Remove the head of the list and return it.
    800  */
    801 void *
    802 txg_list_remove(txg_list_t *tl, uint64_t txg)
    803 {
    804 	int t = txg & TXG_MASK;
    805 	txg_node_t *tn;
    806 	void *p = NULL;
    807 
    808 	mutex_enter(&tl->tl_lock);
    809 	if ((tn = tl->tl_head[t]) != NULL) {
    810 		p = (char *)tn - tl->tl_offset;
    811 		tl->tl_head[t] = tn->tn_next[t];
    812 		tn->tn_next[t] = NULL;
    813 		tn->tn_member[t] = 0;
    814 	}
    815 	mutex_exit(&tl->tl_lock);
    816 
    817 	return (p);
    818 }
    819 
    820 /*
    821  * Remove a specific item from the list and return it.
    822  */
    823 void *
    824 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
    825 {
    826 	int t = txg & TXG_MASK;
    827 	txg_node_t *tn, **tp;
    828 
    829 	mutex_enter(&tl->tl_lock);
    830 
    831 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
    832 		if ((char *)tn - tl->tl_offset == p) {
    833 			*tp = tn->tn_next[t];
    834 			tn->tn_next[t] = NULL;
    835 			tn->tn_member[t] = 0;
    836 			mutex_exit(&tl->tl_lock);
    837 			return (p);
    838 		}
    839 	}
    840 
    841 	mutex_exit(&tl->tl_lock);
    842 
    843 	return (NULL);
    844 }
    845 
    846 boolean_t
    847 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
    848 {
    849 	int t = txg & TXG_MASK;
    850 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    851 
    852 	return (tn->tn_member[t] != 0);
    853 }
    854 
    855 /*
    856  * Walk a txg list -- only safe if you know it's not changing.
    857  */
    858 void *
    859 txg_list_head(txg_list_t *tl, uint64_t txg)
    860 {
    861 	int t = txg & TXG_MASK;
    862 	txg_node_t *tn = tl->tl_head[t];
    863 
    864 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    865 }
    866 
    867 void *
    868 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
    869 {
    870 	int t = txg & TXG_MASK;
    871 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    872 
    873 	tn = tn->tn_next[t];
    874 
    875 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    876 }
    877