Home | History | Annotate | Line # | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
     23  * Copyright (c) 2011, 2016 by Delphix. All rights reserved.
     24  * Copyright (c) 2013 Steven Hartland. All rights reserved.
     25  * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
     26  * Copyright (c) 2014 Integros [integros.com]
     27  * Copyright 2016 Nexenta Systems, Inc.  All rights reserved.
     28  */
     29 
     30 #include <sys/dsl_pool.h>
     31 #include <sys/dsl_dataset.h>
     32 #include <sys/dsl_prop.h>
     33 #include <sys/dsl_dir.h>
     34 #include <sys/dsl_synctask.h>
     35 #include <sys/dsl_scan.h>
     36 #include <sys/dnode.h>
     37 #include <sys/dmu_tx.h>
     38 #include <sys/dmu_objset.h>
     39 #include <sys/arc.h>
     40 #include <sys/zap.h>
     41 #include <sys/zio.h>
     42 #include <sys/zfs_context.h>
     43 #include <sys/fs/zfs.h>
     44 #include <sys/zfs_znode.h>
     45 #include <sys/spa_impl.h>
     46 #include <sys/dsl_deadlist.h>
     47 #include <sys/bptree.h>
     48 #include <sys/zfeature.h>
     49 #include <sys/zil_impl.h>
     50 #include <sys/dsl_userhold.h>
     51 
     52 #if defined(__FreeBSD__) && defined(_KERNEL)
     53 #include <sys/types.h>
     54 #include <sys/sysctl.h>
     55 #endif
     56 
     57 /*
     58  * ZFS Write Throttle
     59  * ------------------
     60  *
     61  * ZFS must limit the rate of incoming writes to the rate at which it is able
     62  * to sync data modifications to the backend storage. Throttling by too much
     63  * creates an artificial limit; throttling by too little can only be sustained
     64  * for short periods and would lead to highly lumpy performance. On a per-pool
     65  * basis, ZFS tracks the amount of modified (dirty) data. As operations change
     66  * data, the amount of dirty data increases; as ZFS syncs out data, the amount
     67  * of dirty data decreases. When the amount of dirty data exceeds a
     68  * predetermined threshold further modifications are blocked until the amount
     69  * of dirty data decreases (as data is synced out).
     70  *
     71  * The limit on dirty data is tunable, and should be adjusted according to
     72  * both the IO capacity and available memory of the system. The larger the
     73  * window, the more ZFS is able to aggregate and amortize metadata (and data)
     74  * changes. However, memory is a limited resource, and allowing for more dirty
     75  * data comes at the cost of keeping other useful data in memory (for example
     76  * ZFS data cached by the ARC).
     77  *
     78  * Implementation
     79  *
     80  * As buffers are modified dsl_pool_willuse_space() increments both the per-
     81  * txg (dp_dirty_pertxg[]) and poolwide (dp_dirty_total) accounting of
     82  * dirty space used; dsl_pool_dirty_space() decrements those values as data
     83  * is synced out from dsl_pool_sync(). While only the poolwide value is
     84  * relevant, the per-txg value is useful for debugging. The tunable
     85  * zfs_dirty_data_max determines the dirty space limit. Once that value is
     86  * exceeded, new writes are halted until space frees up.
     87  *
     88  * The zfs_dirty_data_sync tunable dictates the threshold at which we
     89  * ensure that there is a txg syncing (see the comment in txg.c for a full
     90  * description of transaction group stages).
     91  *
     92  * The IO scheduler uses both the dirty space limit and current amount of
     93  * dirty data as inputs. Those values affect the number of concurrent IOs ZFS
     94  * issues. See the comment in vdev_queue.c for details of the IO scheduler.
     95  *
     96  * The delay is also calculated based on the amount of dirty data.  See the
     97  * comment above dmu_tx_delay() for details.
     98  */
     99 
    100 /*
    101  * zfs_dirty_data_max will be set to zfs_dirty_data_max_percent% of all memory,
    102  * capped at zfs_dirty_data_max_max.  It can also be overridden in /etc/system.
    103  */
    104 uint64_t zfs_dirty_data_max;
    105 uint64_t zfs_dirty_data_max_max = 4ULL * 1024 * 1024 * 1024;
    106 int zfs_dirty_data_max_percent = 10;
    107 
    108 /*
    109  * If there is at least this much dirty data, push out a txg.
    110  */
    111 uint64_t zfs_dirty_data_sync = 64 * 1024 * 1024;
    112 
    113 /*
    114  * Once there is this amount of dirty data, the dmu_tx_delay() will kick in
    115  * and delay each transaction.
    116  * This value should be >= zfs_vdev_async_write_active_max_dirty_percent.
    117  */
    118 int zfs_delay_min_dirty_percent = 60;
    119 
    120 /*
    121  * This controls how quickly the delay approaches infinity.
    122  * Larger values cause it to delay more for a given amount of dirty data.
    123  * Therefore larger values will cause there to be less dirty data for a
    124  * given throughput.
    125  *
    126  * For the smoothest delay, this value should be about 1 billion divided
    127  * by the maximum number of operations per second.  This will smoothly
    128  * handle between 10x and 1/10th this number.
    129  *
    130  * Note: zfs_delay_scale * zfs_dirty_data_max must be < 2^64, due to the
    131  * multiply in dmu_tx_delay().
    132  */
    133 uint64_t zfs_delay_scale = 1000 * 1000 * 1000 / 2000;
    134 
    135 
    136 #if defined(__FreeBSD__) && defined(_KERNEL)
    137 
    138 extern int zfs_vdev_async_write_active_max_dirty_percent;
    139 
    140 SYSCTL_DECL(_vfs_zfs);
    141 
    142 SYSCTL_UQUAD(_vfs_zfs, OID_AUTO, dirty_data_max, CTLFLAG_RWTUN,
    143     &zfs_dirty_data_max, 0,
    144     "The maximum amount of dirty data in bytes after which new writes are "
    145     "halted until space becomes available");
    146 
    147 SYSCTL_UQUAD(_vfs_zfs, OID_AUTO, dirty_data_max_max, CTLFLAG_RDTUN,
    148     &zfs_dirty_data_max_max, 0,
    149     "The absolute cap on dirty_data_max when auto calculating");
    150 
    151 static int sysctl_zfs_dirty_data_max_percent(SYSCTL_HANDLER_ARGS);
    152 SYSCTL_PROC(_vfs_zfs, OID_AUTO, dirty_data_max_percent,
    153     CTLTYPE_INT | CTLFLAG_MPSAFE | CTLFLAG_RWTUN, 0, sizeof(int),
    154     sysctl_zfs_dirty_data_max_percent, "I",
    155     "The percent of physical memory used to auto calculate dirty_data_max");
    156 
    157 SYSCTL_UQUAD(_vfs_zfs, OID_AUTO, dirty_data_sync, CTLFLAG_RWTUN,
    158     &zfs_dirty_data_sync, 0,
    159     "Force a txg if the number of dirty buffer bytes exceed this value");
    160 
    161 static int sysctl_zfs_delay_min_dirty_percent(SYSCTL_HANDLER_ARGS);
    162 /* No zfs_delay_min_dirty_percent tunable due to limit requirements */
    163 SYSCTL_PROC(_vfs_zfs, OID_AUTO, delay_min_dirty_percent,
    164     CTLTYPE_INT | CTLFLAG_MPSAFE | CTLFLAG_RW, 0, sizeof(int),
    165     sysctl_zfs_delay_min_dirty_percent, "I",
    166     "The limit of outstanding dirty data before transations are delayed");
    167 
    168 static int sysctl_zfs_delay_scale(SYSCTL_HANDLER_ARGS);
    169 /* No zfs_delay_scale tunable due to limit requirements */
    170 SYSCTL_PROC(_vfs_zfs, OID_AUTO, delay_scale,
    171     CTLTYPE_U64 | CTLFLAG_MPSAFE | CTLFLAG_RW, 0, sizeof(uint64_t),
    172     sysctl_zfs_delay_scale, "QU",
    173     "Controls how quickly the delay approaches infinity");
    174 
    175 static int
    176 sysctl_zfs_dirty_data_max_percent(SYSCTL_HANDLER_ARGS)
    177 {
    178 	int val, err;
    179 
    180 	val = zfs_dirty_data_max_percent;
    181 	err = sysctl_handle_int(oidp, &val, 0, req);
    182 	if (err != 0 || req->newptr == NULL)
    183 		return (err);
    184 
    185 	if (val < 0 || val > 100)
    186 		return (EINVAL);
    187 
    188 	zfs_dirty_data_max_percent = val;
    189 
    190 	return (0);
    191 }
    192 
    193 static int
    194 sysctl_zfs_delay_min_dirty_percent(SYSCTL_HANDLER_ARGS)
    195 {
    196 	int val, err;
    197 
    198 	val = zfs_delay_min_dirty_percent;
    199 	err = sysctl_handle_int(oidp, &val, 0, req);
    200 	if (err != 0 || req->newptr == NULL)
    201 		return (err);
    202 
    203 	if (val < zfs_vdev_async_write_active_max_dirty_percent)
    204 		return (EINVAL);
    205 
    206 	zfs_delay_min_dirty_percent = val;
    207 
    208 	return (0);
    209 }
    210 
    211 static int
    212 sysctl_zfs_delay_scale(SYSCTL_HANDLER_ARGS)
    213 {
    214 	uint64_t val;
    215 	int err;
    216 
    217 	val = zfs_delay_scale;
    218 	err = sysctl_handle_64(oidp, &val, 0, req);
    219 	if (err != 0 || req->newptr == NULL)
    220 		return (err);
    221 
    222 	if (val > UINT64_MAX / zfs_dirty_data_max)
    223 		return (EINVAL);
    224 
    225 	zfs_delay_scale = val;
    226 
    227 	return (0);
    228 }
    229 #endif
    230 
    231 hrtime_t zfs_throttle_delay = MSEC2NSEC(10);
    232 hrtime_t zfs_throttle_resolution = MSEC2NSEC(10);
    233 
    234 int
    235 dsl_pool_open_special_dir(dsl_pool_t *dp, const char *name, dsl_dir_t **ddp)
    236 {
    237 	uint64_t obj;
    238 	int err;
    239 
    240 	err = zap_lookup(dp->dp_meta_objset,
    241 	    dsl_dir_phys(dp->dp_root_dir)->dd_child_dir_zapobj,
    242 	    name, sizeof (obj), 1, &obj);
    243 	if (err)
    244 		return (err);
    245 
    246 	return (dsl_dir_hold_obj(dp, obj, name, dp, ddp));
    247 }
    248 
    249 static dsl_pool_t *
    250 dsl_pool_open_impl(spa_t *spa, uint64_t txg)
    251 {
    252 	dsl_pool_t *dp;
    253 	blkptr_t *bp = spa_get_rootblkptr(spa);
    254 
    255 	dp = kmem_zalloc(sizeof (dsl_pool_t), KM_SLEEP);
    256 	dp->dp_spa = spa;
    257 	dp->dp_meta_rootbp = *bp;
    258 	rrw_init(&dp->dp_config_rwlock, B_TRUE);
    259 	txg_init(dp, txg);
    260 
    261 	txg_list_create(&dp->dp_dirty_datasets,
    262 	    offsetof(dsl_dataset_t, ds_dirty_link));
    263 	txg_list_create(&dp->dp_dirty_zilogs,
    264 	    offsetof(zilog_t, zl_dirty_link));
    265 	txg_list_create(&dp->dp_dirty_dirs,
    266 	    offsetof(dsl_dir_t, dd_dirty_link));
    267 	txg_list_create(&dp->dp_sync_tasks,
    268 	    offsetof(dsl_sync_task_t, dst_node));
    269 
    270 	mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
    271 	cv_init(&dp->dp_spaceavail_cv, NULL, CV_DEFAULT, NULL);
    272 
    273 	dp->dp_vnrele_taskq = taskq_create("zfs_vn_rele_taskq", 1, minclsyspri,
    274 	    1, 4, 0);
    275 
    276 	return (dp);
    277 }
    278 
    279 int
    280 dsl_pool_init(spa_t *spa, uint64_t txg, dsl_pool_t **dpp)
    281 {
    282 	int err;
    283 	dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
    284 
    285 	err = dmu_objset_open_impl(spa, NULL, &dp->dp_meta_rootbp,
    286 	    &dp->dp_meta_objset);
    287 	if (err != 0)
    288 		dsl_pool_close(dp);
    289 	else
    290 		*dpp = dp;
    291 
    292 	return (err);
    293 }
    294 
    295 int
    296 dsl_pool_open(dsl_pool_t *dp)
    297 {
    298 	int err;
    299 	dsl_dir_t *dd;
    300 	dsl_dataset_t *ds;
    301 	uint64_t obj;
    302 
    303 	rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
    304 	err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
    305 	    DMU_POOL_ROOT_DATASET, sizeof (uint64_t), 1,
    306 	    &dp->dp_root_dir_obj);
    307 	if (err)
    308 		goto out;
    309 
    310 	err = dsl_dir_hold_obj(dp, dp->dp_root_dir_obj,
    311 	    NULL, dp, &dp->dp_root_dir);
    312 	if (err)
    313 		goto out;
    314 
    315 	err = dsl_pool_open_special_dir(dp, MOS_DIR_NAME, &dp->dp_mos_dir);
    316 	if (err)
    317 		goto out;
    318 
    319 	if (spa_version(dp->dp_spa) >= SPA_VERSION_ORIGIN) {
    320 		err = dsl_pool_open_special_dir(dp, ORIGIN_DIR_NAME, &dd);
    321 		if (err)
    322 			goto out;
    323 		err = dsl_dataset_hold_obj(dp,
    324 		    dsl_dir_phys(dd)->dd_head_dataset_obj, FTAG, &ds);
    325 		if (err == 0) {
    326 			err = dsl_dataset_hold_obj(dp,
    327 			    dsl_dataset_phys(ds)->ds_prev_snap_obj, dp,
    328 			    &dp->dp_origin_snap);
    329 			dsl_dataset_rele(ds, FTAG);
    330 		}
    331 		dsl_dir_rele(dd, dp);
    332 		if (err)
    333 			goto out;
    334 	}
    335 
    336 	if (spa_version(dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
    337 		err = dsl_pool_open_special_dir(dp, FREE_DIR_NAME,
    338 		    &dp->dp_free_dir);
    339 		if (err)
    340 			goto out;
    341 
    342 		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
    343 		    DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj);
    344 		if (err)
    345 			goto out;
    346 		VERIFY0(bpobj_open(&dp->dp_free_bpobj,
    347 		    dp->dp_meta_objset, obj));
    348 	}
    349 
    350 	/*
    351 	 * Note: errors ignored, because the leak dir will not exist if we
    352 	 * have not encountered a leak yet.
    353 	 */
    354 	(void) dsl_pool_open_special_dir(dp, LEAK_DIR_NAME,
    355 	    &dp->dp_leak_dir);
    356 
    357 	if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_ASYNC_DESTROY)) {
    358 		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
    359 		    DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
    360 		    &dp->dp_bptree_obj);
    361 		if (err != 0)
    362 			goto out;
    363 	}
    364 
    365 	if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMPTY_BPOBJ)) {
    366 		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
    367 		    DMU_POOL_EMPTY_BPOBJ, sizeof (uint64_t), 1,
    368 		    &dp->dp_empty_bpobj);
    369 		if (err != 0)
    370 			goto out;
    371 	}
    372 
    373 	err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
    374 	    DMU_POOL_TMP_USERREFS, sizeof (uint64_t), 1,
    375 	    &dp->dp_tmp_userrefs_obj);
    376 	if (err == ENOENT)
    377 		err = 0;
    378 	if (err)
    379 		goto out;
    380 
    381 	err = dsl_scan_init(dp, dp->dp_tx.tx_open_txg);
    382 
    383 out:
    384 	rrw_exit(&dp->dp_config_rwlock, FTAG);
    385 	return (err);
    386 }
    387 
    388 void
    389 dsl_pool_close(dsl_pool_t *dp)
    390 {
    391 	/*
    392 	 * Drop our references from dsl_pool_open().
    393 	 *
    394 	 * Since we held the origin_snap from "syncing" context (which
    395 	 * includes pool-opening context), it actually only got a "ref"
    396 	 * and not a hold, so just drop that here.
    397 	 */
    398 	if (dp->dp_origin_snap)
    399 		dsl_dataset_rele(dp->dp_origin_snap, dp);
    400 	if (dp->dp_mos_dir)
    401 		dsl_dir_rele(dp->dp_mos_dir, dp);
    402 	if (dp->dp_free_dir)
    403 		dsl_dir_rele(dp->dp_free_dir, dp);
    404 	if (dp->dp_leak_dir)
    405 		dsl_dir_rele(dp->dp_leak_dir, dp);
    406 	if (dp->dp_root_dir)
    407 		dsl_dir_rele(dp->dp_root_dir, dp);
    408 
    409 	bpobj_close(&dp->dp_free_bpobj);
    410 
    411 	/* undo the dmu_objset_open_impl(mos) from dsl_pool_open() */
    412 	if (dp->dp_meta_objset)
    413 		dmu_objset_evict(dp->dp_meta_objset);
    414 
    415 	txg_list_destroy(&dp->dp_dirty_datasets);
    416 	txg_list_destroy(&dp->dp_dirty_zilogs);
    417 	txg_list_destroy(&dp->dp_sync_tasks);
    418 	txg_list_destroy(&dp->dp_dirty_dirs);
    419 
    420 	/*
    421 	 * We can't set retry to TRUE since we're explicitly specifying
    422 	 * a spa to flush. This is good enough; any missed buffers for
    423 	 * this spa won't cause trouble, and they'll eventually fall
    424 	 * out of the ARC just like any other unused buffer.
    425 	 */
    426 	arc_flush(dp->dp_spa, FALSE);
    427 
    428 	txg_fini(dp);
    429 	dsl_scan_fini(dp);
    430 	dmu_buf_user_evict_wait();
    431 
    432 	rrw_destroy(&dp->dp_config_rwlock);
    433 	mutex_destroy(&dp->dp_lock);
    434 	cv_destroy(&dp->dp_spaceavail_cv);
    435 	taskq_destroy(dp->dp_vnrele_taskq);
    436 	if (dp->dp_blkstats)
    437 		kmem_free(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
    438 	kmem_free(dp, sizeof (dsl_pool_t));
    439 }
    440 
    441 dsl_pool_t *
    442 dsl_pool_create(spa_t *spa, nvlist_t *zplprops, uint64_t txg)
    443 {
    444 	int err;
    445 	dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
    446 	dmu_tx_t *tx = dmu_tx_create_assigned(dp, txg);
    447 	objset_t *os;
    448 	dsl_dataset_t *ds;
    449 	uint64_t obj;
    450 
    451 	rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
    452 
    453 	/* create and open the MOS (meta-objset) */
    454 	dp->dp_meta_objset = dmu_objset_create_impl(spa,
    455 	    NULL, &dp->dp_meta_rootbp, DMU_OST_META, tx);
    456 
    457 	/* create the pool directory */
    458 	err = zap_create_claim(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
    459 	    DMU_OT_OBJECT_DIRECTORY, DMU_OT_NONE, 0, tx);
    460 	ASSERT0(err);
    461 
    462 	/* Initialize scan structures */
    463 	VERIFY0(dsl_scan_init(dp, txg));
    464 
    465 	/* create and open the root dir */
    466 	dp->dp_root_dir_obj = dsl_dir_create_sync(dp, NULL, NULL, tx);
    467 	VERIFY0(dsl_dir_hold_obj(dp, dp->dp_root_dir_obj,
    468 	    NULL, dp, &dp->dp_root_dir));
    469 
    470 	/* create and open the meta-objset dir */
    471 	(void) dsl_dir_create_sync(dp, dp->dp_root_dir, MOS_DIR_NAME, tx);
    472 	VERIFY0(dsl_pool_open_special_dir(dp,
    473 	    MOS_DIR_NAME, &dp->dp_mos_dir));
    474 
    475 	if (spa_version(spa) >= SPA_VERSION_DEADLISTS) {
    476 		/* create and open the free dir */
    477 		(void) dsl_dir_create_sync(dp, dp->dp_root_dir,
    478 		    FREE_DIR_NAME, tx);
    479 		VERIFY0(dsl_pool_open_special_dir(dp,
    480 		    FREE_DIR_NAME, &dp->dp_free_dir));
    481 
    482 		/* create and open the free_bplist */
    483 		obj = bpobj_alloc(dp->dp_meta_objset, SPA_OLD_MAXBLOCKSIZE, tx);
    484 		VERIFY(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
    485 		    DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx) == 0);
    486 		VERIFY0(bpobj_open(&dp->dp_free_bpobj,
    487 		    dp->dp_meta_objset, obj));
    488 	}
    489 
    490 	if (spa_version(spa) >= SPA_VERSION_DSL_SCRUB)
    491 		dsl_pool_create_origin(dp, tx);
    492 
    493 	/* create the root dataset */
    494 	obj = dsl_dataset_create_sync_dd(dp->dp_root_dir, NULL, 0, tx);
    495 
    496 	/* create the root objset */
    497 	VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, &ds));
    498 	rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
    499 	os = dmu_objset_create_impl(dp->dp_spa, ds,
    500 	    dsl_dataset_get_blkptr(ds), DMU_OST_ZFS, tx);
    501 	rrw_exit(&ds->ds_bp_rwlock, FTAG);
    502 #ifdef _KERNEL
    503 	zfs_create_fs(os, kcred, zplprops, tx);
    504 #endif
    505 	dsl_dataset_rele(ds, FTAG);
    506 
    507 	dmu_tx_commit(tx);
    508 
    509 	rrw_exit(&dp->dp_config_rwlock, FTAG);
    510 
    511 	return (dp);
    512 }
    513 
    514 /*
    515  * Account for the meta-objset space in its placeholder dsl_dir.
    516  */
    517 void
    518 dsl_pool_mos_diduse_space(dsl_pool_t *dp,
    519     int64_t used, int64_t comp, int64_t uncomp)
    520 {
    521 	ASSERT3U(comp, ==, uncomp); /* it's all metadata */
    522 	mutex_enter(&dp->dp_lock);
    523 	dp->dp_mos_used_delta += used;
    524 	dp->dp_mos_compressed_delta += comp;
    525 	dp->dp_mos_uncompressed_delta += uncomp;
    526 	mutex_exit(&dp->dp_lock);
    527 }
    528 
    529 static void
    530 dsl_pool_sync_mos(dsl_pool_t *dp, dmu_tx_t *tx)
    531 {
    532 	zio_t *zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
    533 	dmu_objset_sync(dp->dp_meta_objset, zio, tx);
    534 	VERIFY0(zio_wait(zio));
    535 	dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
    536 	spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
    537 }
    538 
    539 static void
    540 dsl_pool_dirty_delta(dsl_pool_t *dp, int64_t delta)
    541 {
    542 	ASSERT(MUTEX_HELD(&dp->dp_lock));
    543 
    544 	if (delta < 0)
    545 		ASSERT3U(-delta, <=, dp->dp_dirty_total);
    546 
    547 	dp->dp_dirty_total += delta;
    548 
    549 	/*
    550 	 * Note: we signal even when increasing dp_dirty_total.
    551 	 * This ensures forward progress -- each thread wakes the next waiter.
    552 	 */
    553 	if (dp->dp_dirty_total <= zfs_dirty_data_max)
    554 		cv_signal(&dp->dp_spaceavail_cv);
    555 }
    556 
    557 void
    558 dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
    559 {
    560 	zio_t *zio;
    561 	dmu_tx_t *tx;
    562 	dsl_dir_t *dd;
    563 	dsl_dataset_t *ds;
    564 	objset_t *mos = dp->dp_meta_objset;
    565 	list_t synced_datasets;
    566 
    567 	list_create(&synced_datasets, sizeof (dsl_dataset_t),
    568 	    offsetof(dsl_dataset_t, ds_synced_link));
    569 
    570 	tx = dmu_tx_create_assigned(dp, txg);
    571 
    572 	/*
    573 	 * Write out all dirty blocks of dirty datasets.
    574 	 */
    575 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
    576 	while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
    577 		/*
    578 		 * We must not sync any non-MOS datasets twice, because
    579 		 * we may have taken a snapshot of them.  However, we
    580 		 * may sync newly-created datasets on pass 2.
    581 		 */
    582 		ASSERT(!list_link_active(&ds->ds_synced_link));
    583 		list_insert_tail(&synced_datasets, ds);
    584 		dsl_dataset_sync(ds, zio, tx);
    585 	}
    586 	VERIFY0(zio_wait(zio));
    587 
    588 	/*
    589 	 * We have written all of the accounted dirty data, so our
    590 	 * dp_space_towrite should now be zero.  However, some seldom-used
    591 	 * code paths do not adhere to this (e.g. dbuf_undirty(), also
    592 	 * rounding error in dbuf_write_physdone).
    593 	 * Shore up the accounting of any dirtied space now.
    594 	 */
    595 	dsl_pool_undirty_space(dp, dp->dp_dirty_pertxg[txg & TXG_MASK], txg);
    596 
    597 	/*
    598 	 * Update the long range free counter after
    599 	 * we're done syncing user data
    600 	 */
    601 	mutex_enter(&dp->dp_lock);
    602 	ASSERT(spa_sync_pass(dp->dp_spa) == 1 ||
    603 	    dp->dp_long_free_dirty_pertxg[txg & TXG_MASK] == 0);
    604 	dp->dp_long_free_dirty_pertxg[txg & TXG_MASK] = 0;
    605 	mutex_exit(&dp->dp_lock);
    606 
    607 	/*
    608 	 * After the data blocks have been written (ensured by the zio_wait()
    609 	 * above), update the user/group space accounting.
    610 	 */
    611 	for (ds = list_head(&synced_datasets); ds != NULL;
    612 	    ds = list_next(&synced_datasets, ds)) {
    613 		dmu_objset_do_userquota_updates(ds->ds_objset, tx);
    614 	}
    615 
    616 	/*
    617 	 * Sync the datasets again to push out the changes due to
    618 	 * userspace updates.  This must be done before we process the
    619 	 * sync tasks, so that any snapshots will have the correct
    620 	 * user accounting information (and we won't get confused
    621 	 * about which blocks are part of the snapshot).
    622 	 */
    623 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
    624 	while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
    625 		ASSERT(list_link_active(&ds->ds_synced_link));
    626 		dmu_buf_rele(ds->ds_dbuf, ds);
    627 		dsl_dataset_sync(ds, zio, tx);
    628 	}
    629 	VERIFY0(zio_wait(zio));
    630 
    631 	/*
    632 	 * Now that the datasets have been completely synced, we can
    633 	 * clean up our in-memory structures accumulated while syncing:
    634 	 *
    635 	 *  - move dead blocks from the pending deadlist to the on-disk deadlist
    636 	 *  - release hold from dsl_dataset_dirty()
    637 	 */
    638 	while ((ds = list_remove_head(&synced_datasets)) != NULL) {
    639 		dsl_dataset_sync_done(ds, tx);
    640 	}
    641 	while ((dd = txg_list_remove(&dp->dp_dirty_dirs, txg)) != NULL) {
    642 		dsl_dir_sync(dd, tx);
    643 	}
    644 
    645 	/*
    646 	 * The MOS's space is accounted for in the pool/$MOS
    647 	 * (dp_mos_dir).  We can't modify the mos while we're syncing
    648 	 * it, so we remember the deltas and apply them here.
    649 	 */
    650 	if (dp->dp_mos_used_delta != 0 || dp->dp_mos_compressed_delta != 0 ||
    651 	    dp->dp_mos_uncompressed_delta != 0) {
    652 		dsl_dir_diduse_space(dp->dp_mos_dir, DD_USED_HEAD,
    653 		    dp->dp_mos_used_delta,
    654 		    dp->dp_mos_compressed_delta,
    655 		    dp->dp_mos_uncompressed_delta, tx);
    656 		dp->dp_mos_used_delta = 0;
    657 		dp->dp_mos_compressed_delta = 0;
    658 		dp->dp_mos_uncompressed_delta = 0;
    659 	}
    660 
    661 	if (list_head(&mos->os_dirty_dnodes[txg & TXG_MASK]) != NULL ||
    662 	    list_head(&mos->os_free_dnodes[txg & TXG_MASK]) != NULL) {
    663 		dsl_pool_sync_mos(dp, tx);
    664 	}
    665 
    666 	/*
    667 	 * If we modify a dataset in the same txg that we want to destroy it,
    668 	 * its dsl_dir's dd_dbuf will be dirty, and thus have a hold on it.
    669 	 * dsl_dir_destroy_check() will fail if there are unexpected holds.
    670 	 * Therefore, we want to sync the MOS (thus syncing the dd_dbuf
    671 	 * and clearing the hold on it) before we process the sync_tasks.
    672 	 * The MOS data dirtied by the sync_tasks will be synced on the next
    673 	 * pass.
    674 	 */
    675 	if (!txg_list_empty(&dp->dp_sync_tasks, txg)) {
    676 		dsl_sync_task_t *dst;
    677 		/*
    678 		 * No more sync tasks should have been added while we
    679 		 * were syncing.
    680 		 */
    681 		ASSERT3U(spa_sync_pass(dp->dp_spa), ==, 1);
    682 		while ((dst = txg_list_remove(&dp->dp_sync_tasks, txg)) != NULL)
    683 			dsl_sync_task_sync(dst, tx);
    684 	}
    685 
    686 	dmu_tx_commit(tx);
    687 
    688 	DTRACE_PROBE2(dsl_pool_sync__done, dsl_pool_t *dp, dp, uint64_t, txg);
    689 }
    690 
    691 void
    692 dsl_pool_sync_done(dsl_pool_t *dp, uint64_t txg)
    693 {
    694 	zilog_t *zilog;
    695 
    696 	while (zilog = txg_list_head(&dp->dp_dirty_zilogs, txg)) {
    697 		dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
    698 		/*
    699 		 * We don't remove the zilog from the dp_dirty_zilogs
    700 		 * list until after we've cleaned it. This ensures that
    701 		 * callers of zilog_is_dirty() receive an accurate
    702 		 * answer when they are racing with the spa sync thread.
    703 		 */
    704 		zil_clean(zilog, txg);
    705 		(void) txg_list_remove_this(&dp->dp_dirty_zilogs, zilog, txg);
    706 		ASSERT(!dmu_objset_is_dirty(zilog->zl_os, txg));
    707 		dmu_buf_rele(ds->ds_dbuf, zilog);
    708 	}
    709 	ASSERT(!dmu_objset_is_dirty(dp->dp_meta_objset, txg));
    710 }
    711 
    712 /*
    713  * TRUE if the current thread is the tx_sync_thread or if we
    714  * are being called from SPA context during pool initialization.
    715  */
    716 int
    717 dsl_pool_sync_context(dsl_pool_t *dp)
    718 {
    719 	return (curthread == dp->dp_tx.tx_sync_thread ||
    720 	    spa_is_initializing(dp->dp_spa));
    721 }
    722 
    723 uint64_t
    724 dsl_pool_adjustedsize(dsl_pool_t *dp, boolean_t netfree)
    725 {
    726 	uint64_t space, resv;
    727 
    728 	/*
    729 	 * If we're trying to assess whether it's OK to do a free,
    730 	 * cut the reservation in half to allow forward progress
    731 	 * (e.g. make it possible to rm(1) files from a full pool).
    732 	 */
    733 	space = spa_get_dspace(dp->dp_spa);
    734 	resv = spa_get_slop_space(dp->dp_spa);
    735 	if (netfree)
    736 		resv >>= 1;
    737 
    738 	return (space - resv);
    739 }
    740 
    741 boolean_t
    742 dsl_pool_need_dirty_delay(dsl_pool_t *dp)
    743 {
    744 	uint64_t delay_min_bytes =
    745 	    zfs_dirty_data_max * zfs_delay_min_dirty_percent / 100;
    746 	boolean_t rv;
    747 
    748 	mutex_enter(&dp->dp_lock);
    749 	if (dp->dp_dirty_total > zfs_dirty_data_sync)
    750 		txg_kick(dp);
    751 	rv = (dp->dp_dirty_total > delay_min_bytes);
    752 	mutex_exit(&dp->dp_lock);
    753 	return (rv);
    754 }
    755 
    756 void
    757 dsl_pool_dirty_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
    758 {
    759 	if (space > 0) {
    760 		mutex_enter(&dp->dp_lock);
    761 		dp->dp_dirty_pertxg[tx->tx_txg & TXG_MASK] += space;
    762 		dsl_pool_dirty_delta(dp, space);
    763 		mutex_exit(&dp->dp_lock);
    764 	}
    765 }
    766 
    767 void
    768 dsl_pool_undirty_space(dsl_pool_t *dp, int64_t space, uint64_t txg)
    769 {
    770 	ASSERT3S(space, >=, 0);
    771 	if (space == 0)
    772 		return;
    773 	mutex_enter(&dp->dp_lock);
    774 	if (dp->dp_dirty_pertxg[txg & TXG_MASK] < space) {
    775 		/* XXX writing something we didn't dirty? */
    776 		space = dp->dp_dirty_pertxg[txg & TXG_MASK];
    777 	}
    778 	ASSERT3U(dp->dp_dirty_pertxg[txg & TXG_MASK], >=, space);
    779 	dp->dp_dirty_pertxg[txg & TXG_MASK] -= space;
    780 	ASSERT3U(dp->dp_dirty_total, >=, space);
    781 	dsl_pool_dirty_delta(dp, -space);
    782 	mutex_exit(&dp->dp_lock);
    783 }
    784 
    785 /* ARGSUSED */
    786 static int
    787 upgrade_clones_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
    788 {
    789 	dmu_tx_t *tx = arg;
    790 	dsl_dataset_t *ds, *prev = NULL;
    791 	int err;
    792 
    793 	err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
    794 	if (err)
    795 		return (err);
    796 
    797 	while (dsl_dataset_phys(ds)->ds_prev_snap_obj != 0) {
    798 		err = dsl_dataset_hold_obj(dp,
    799 		    dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
    800 		if (err) {
    801 			dsl_dataset_rele(ds, FTAG);
    802 			return (err);
    803 		}
    804 
    805 		if (dsl_dataset_phys(prev)->ds_next_snap_obj != ds->ds_object)
    806 			break;
    807 		dsl_dataset_rele(ds, FTAG);
    808 		ds = prev;
    809 		prev = NULL;
    810 	}
    811 
    812 	if (prev == NULL) {
    813 		prev = dp->dp_origin_snap;
    814 
    815 		/*
    816 		 * The $ORIGIN can't have any data, or the accounting
    817 		 * will be wrong.
    818 		 */
    819 		rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
    820 		ASSERT0(dsl_dataset_phys(prev)->ds_bp.blk_birth);
    821 		rrw_exit(&ds->ds_bp_rwlock, FTAG);
    822 
    823 		/* The origin doesn't get attached to itself */
    824 		if (ds->ds_object == prev->ds_object) {
    825 			dsl_dataset_rele(ds, FTAG);
    826 			return (0);
    827 		}
    828 
    829 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
    830 		dsl_dataset_phys(ds)->ds_prev_snap_obj = prev->ds_object;
    831 		dsl_dataset_phys(ds)->ds_prev_snap_txg =
    832 		    dsl_dataset_phys(prev)->ds_creation_txg;
    833 
    834 		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
    835 		dsl_dir_phys(ds->ds_dir)->dd_origin_obj = prev->ds_object;
    836 
    837 		dmu_buf_will_dirty(prev->ds_dbuf, tx);
    838 		dsl_dataset_phys(prev)->ds_num_children++;
    839 
    840 		if (dsl_dataset_phys(ds)->ds_next_snap_obj == 0) {
    841 			ASSERT(ds->ds_prev == NULL);
    842 			VERIFY0(dsl_dataset_hold_obj(dp,
    843 			    dsl_dataset_phys(ds)->ds_prev_snap_obj,
    844 			    ds, &ds->ds_prev));
    845 		}
    846 	}
    847 
    848 	ASSERT3U(dsl_dir_phys(ds->ds_dir)->dd_origin_obj, ==, prev->ds_object);
    849 	ASSERT3U(dsl_dataset_phys(ds)->ds_prev_snap_obj, ==, prev->ds_object);
    850 
    851 	if (dsl_dataset_phys(prev)->ds_next_clones_obj == 0) {
    852 		dmu_buf_will_dirty(prev->ds_dbuf, tx);
    853 		dsl_dataset_phys(prev)->ds_next_clones_obj =
    854 		    zap_create(dp->dp_meta_objset,
    855 		    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
    856 	}
    857 	VERIFY0(zap_add_int(dp->dp_meta_objset,
    858 	    dsl_dataset_phys(prev)->ds_next_clones_obj, ds->ds_object, tx));
    859 
    860 	dsl_dataset_rele(ds, FTAG);
    861 	if (prev != dp->dp_origin_snap)
    862 		dsl_dataset_rele(prev, FTAG);
    863 	return (0);
    864 }
    865 
    866 void
    867 dsl_pool_upgrade_clones(dsl_pool_t *dp, dmu_tx_t *tx)
    868 {
    869 	ASSERT(dmu_tx_is_syncing(tx));
    870 	ASSERT(dp->dp_origin_snap != NULL);
    871 
    872 	VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj, upgrade_clones_cb,
    873 	    tx, DS_FIND_CHILDREN | DS_FIND_SERIALIZE));
    874 }
    875 
    876 /* ARGSUSED */
    877 static int
    878 upgrade_dir_clones_cb(dsl_pool_t *dp, dsl_dataset_t *ds, void *arg)
    879 {
    880 	dmu_tx_t *tx = arg;
    881 	objset_t *mos = dp->dp_meta_objset;
    882 
    883 	if (dsl_dir_phys(ds->ds_dir)->dd_origin_obj != 0) {
    884 		dsl_dataset_t *origin;
    885 
    886 		VERIFY0(dsl_dataset_hold_obj(dp,
    887 		    dsl_dir_phys(ds->ds_dir)->dd_origin_obj, FTAG, &origin));
    888 
    889 		if (dsl_dir_phys(origin->ds_dir)->dd_clones == 0) {
    890 			dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
    891 			dsl_dir_phys(origin->ds_dir)->dd_clones =
    892 			    zap_create(mos, DMU_OT_DSL_CLONES, DMU_OT_NONE,
    893 			    0, tx);
    894 		}
    895 
    896 		VERIFY0(zap_add_int(dp->dp_meta_objset,
    897 		    dsl_dir_phys(origin->ds_dir)->dd_clones,
    898 		    ds->ds_object, tx));
    899 
    900 		dsl_dataset_rele(origin, FTAG);
    901 	}
    902 	return (0);
    903 }
    904 
    905 void
    906 dsl_pool_upgrade_dir_clones(dsl_pool_t *dp, dmu_tx_t *tx)
    907 {
    908 	ASSERT(dmu_tx_is_syncing(tx));
    909 	uint64_t obj;
    910 
    911 	(void) dsl_dir_create_sync(dp, dp->dp_root_dir, FREE_DIR_NAME, tx);
    912 	VERIFY0(dsl_pool_open_special_dir(dp,
    913 	    FREE_DIR_NAME, &dp->dp_free_dir));
    914 
    915 	/*
    916 	 * We can't use bpobj_alloc(), because spa_version() still
    917 	 * returns the old version, and we need a new-version bpobj with
    918 	 * subobj support.  So call dmu_object_alloc() directly.
    919 	 */
    920 	obj = dmu_object_alloc(dp->dp_meta_objset, DMU_OT_BPOBJ,
    921 	    SPA_OLD_MAXBLOCKSIZE, DMU_OT_BPOBJ_HDR, sizeof (bpobj_phys_t), tx);
    922 	VERIFY0(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
    923 	    DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx));
    924 	VERIFY0(bpobj_open(&dp->dp_free_bpobj, dp->dp_meta_objset, obj));
    925 
    926 	VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
    927 	    upgrade_dir_clones_cb, tx, DS_FIND_CHILDREN | DS_FIND_SERIALIZE));
    928 }
    929 
    930 void
    931 dsl_pool_create_origin(dsl_pool_t *dp, dmu_tx_t *tx)
    932 {
    933 	uint64_t dsobj;
    934 	dsl_dataset_t *ds;
    935 
    936 	ASSERT(dmu_tx_is_syncing(tx));
    937 	ASSERT(dp->dp_origin_snap == NULL);
    938 	ASSERT(rrw_held(&dp->dp_config_rwlock, RW_WRITER));
    939 
    940 	/* create the origin dir, ds, & snap-ds */
    941 	dsobj = dsl_dataset_create_sync(dp->dp_root_dir, ORIGIN_DIR_NAME,
    942 	    NULL, 0, kcred, tx);
    943 	VERIFY0(dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
    944 	dsl_dataset_snapshot_sync_impl(ds, ORIGIN_DIR_NAME, tx);
    945 	VERIFY0(dsl_dataset_hold_obj(dp, dsl_dataset_phys(ds)->ds_prev_snap_obj,
    946 	    dp, &dp->dp_origin_snap));
    947 	dsl_dataset_rele(ds, FTAG);
    948 }
    949 
    950 taskq_t *
    951 dsl_pool_vnrele_taskq(dsl_pool_t *dp)
    952 {
    953 	return (dp->dp_vnrele_taskq);
    954 }
    955 
    956 /*
    957  * Walk through the pool-wide zap object of temporary snapshot user holds
    958  * and release them.
    959  */
    960 void
    961 dsl_pool_clean_tmp_userrefs(dsl_pool_t *dp)
    962 {
    963 	zap_attribute_t za;
    964 	zap_cursor_t zc;
    965 	objset_t *mos = dp->dp_meta_objset;
    966 	uint64_t zapobj = dp->dp_tmp_userrefs_obj;
    967 	nvlist_t *holds;
    968 
    969 	if (zapobj == 0)
    970 		return;
    971 	ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
    972 
    973 	holds = fnvlist_alloc();
    974 
    975 	for (zap_cursor_init(&zc, mos, zapobj);
    976 	    zap_cursor_retrieve(&zc, &za) == 0;
    977 	    zap_cursor_advance(&zc)) {
    978 		char *htag;
    979 		nvlist_t *tags;
    980 
    981 		htag = strchr(za.za_name, '-');
    982 		*htag = '\0';
    983 		++htag;
    984 		if (nvlist_lookup_nvlist(holds, za.za_name, &tags) != 0) {
    985 			tags = fnvlist_alloc();
    986 			fnvlist_add_boolean(tags, htag);
    987 			fnvlist_add_nvlist(holds, za.za_name, tags);
    988 			fnvlist_free(tags);
    989 		} else {
    990 			fnvlist_add_boolean(tags, htag);
    991 		}
    992 	}
    993 	dsl_dataset_user_release_tmp(dp, holds);
    994 	fnvlist_free(holds);
    995 	zap_cursor_fini(&zc);
    996 }
    997 
    998 /*
    999  * Create the pool-wide zap object for storing temporary snapshot holds.
   1000  */
   1001 void
   1002 dsl_pool_user_hold_create_obj(dsl_pool_t *dp, dmu_tx_t *tx)
   1003 {
   1004 	objset_t *mos = dp->dp_meta_objset;
   1005 
   1006 	ASSERT(dp->dp_tmp_userrefs_obj == 0);
   1007 	ASSERT(dmu_tx_is_syncing(tx));
   1008 
   1009 	dp->dp_tmp_userrefs_obj = zap_create_link(mos, DMU_OT_USERREFS,
   1010 	    DMU_POOL_DIRECTORY_OBJECT, DMU_POOL_TMP_USERREFS, tx);
   1011 }
   1012 
   1013 static int
   1014 dsl_pool_user_hold_rele_impl(dsl_pool_t *dp, uint64_t dsobj,
   1015     const char *tag, uint64_t now, dmu_tx_t *tx, boolean_t holding)
   1016 {
   1017 	objset_t *mos = dp->dp_meta_objset;
   1018 	uint64_t zapobj = dp->dp_tmp_userrefs_obj;
   1019 	char *name;
   1020 	int error;
   1021 
   1022 	ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
   1023 	ASSERT(dmu_tx_is_syncing(tx));
   1024 
   1025 	/*
   1026 	 * If the pool was created prior to SPA_VERSION_USERREFS, the
   1027 	 * zap object for temporary holds might not exist yet.
   1028 	 */
   1029 	if (zapobj == 0) {
   1030 		if (holding) {
   1031 			dsl_pool_user_hold_create_obj(dp, tx);
   1032 			zapobj = dp->dp_tmp_userrefs_obj;
   1033 		} else {
   1034 			return (SET_ERROR(ENOENT));
   1035 		}
   1036 	}
   1037 
   1038 	name = kmem_asprintf("%llx-%s", (u_longlong_t)dsobj, tag);
   1039 	if (holding)
   1040 		error = zap_add(mos, zapobj, name, 8, 1, &now, tx);
   1041 	else
   1042 		error = zap_remove(mos, zapobj, name, tx);
   1043 	strfree(name);
   1044 
   1045 	return (error);
   1046 }
   1047 
   1048 /*
   1049  * Add a temporary hold for the given dataset object and tag.
   1050  */
   1051 int
   1052 dsl_pool_user_hold(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
   1053     uint64_t now, dmu_tx_t *tx)
   1054 {
   1055 	return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, now, tx, B_TRUE));
   1056 }
   1057 
   1058 /*
   1059  * Release a temporary hold for the given dataset object and tag.
   1060  */
   1061 int
   1062 dsl_pool_user_release(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
   1063     dmu_tx_t *tx)
   1064 {
   1065 	return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, 0,
   1066 	    tx, B_FALSE));
   1067 }
   1068 
   1069 /*
   1070  * DSL Pool Configuration Lock
   1071  *
   1072  * The dp_config_rwlock protects against changes to DSL state (e.g. dataset
   1073  * creation / destruction / rename / property setting).  It must be held for
   1074  * read to hold a dataset or dsl_dir.  I.e. you must call
   1075  * dsl_pool_config_enter() or dsl_pool_hold() before calling
   1076  * dsl_{dataset,dir}_hold{_obj}.  In most circumstances, the dp_config_rwlock
   1077  * must be held continuously until all datasets and dsl_dirs are released.
   1078  *
   1079  * The only exception to this rule is that if a "long hold" is placed on
   1080  * a dataset, then the dp_config_rwlock may be dropped while the dataset
   1081  * is still held.  The long hold will prevent the dataset from being
   1082  * destroyed -- the destroy will fail with EBUSY.  A long hold can be
   1083  * obtained by calling dsl_dataset_long_hold(), or by "owning" a dataset
   1084  * (by calling dsl_{dataset,objset}_{try}own{_obj}).
   1085  *
   1086  * Legitimate long-holders (including owners) should be long-running, cancelable
   1087  * tasks that should cause "zfs destroy" to fail.  This includes DMU
   1088  * consumers (i.e. a ZPL filesystem being mounted or ZVOL being open),
   1089  * "zfs send", and "zfs diff".  There are several other long-holders whose
   1090  * uses are suboptimal (e.g. "zfs promote", and zil_suspend()).
   1091  *
   1092  * The usual formula for long-holding would be:
   1093  * dsl_pool_hold()
   1094  * dsl_dataset_hold()
   1095  * ... perform checks ...
   1096  * dsl_dataset_long_hold()
   1097  * dsl_pool_rele()
   1098  * ... perform long-running task ...
   1099  * dsl_dataset_long_rele()
   1100  * dsl_dataset_rele()
   1101  *
   1102  * Note that when the long hold is released, the dataset is still held but
   1103  * the pool is not held.  The dataset may change arbitrarily during this time
   1104  * (e.g. it could be destroyed).  Therefore you shouldn't do anything to the
   1105  * dataset except release it.
   1106  *
   1107  * User-initiated operations (e.g. ioctls, zfs_ioc_*()) are either read-only
   1108  * or modifying operations.
   1109  *
   1110  * Modifying operations should generally use dsl_sync_task().  The synctask
   1111  * infrastructure enforces proper locking strategy with respect to the
   1112  * dp_config_rwlock.  See the comment above dsl_sync_task() for details.
   1113  *
   1114  * Read-only operations will manually hold the pool, then the dataset, obtain
   1115  * information from the dataset, then release the pool and dataset.
   1116  * dmu_objset_{hold,rele}() are convenience routines that also do the pool
   1117  * hold/rele.
   1118  */
   1119 
   1120 int
   1121 dsl_pool_hold(const char *name, void *tag, dsl_pool_t **dp)
   1122 {
   1123 	spa_t *spa;
   1124 	int error;
   1125 
   1126 	error = spa_open(name, &spa, tag);
   1127 	if (error == 0) {
   1128 		*dp = spa_get_dsl(spa);
   1129 		dsl_pool_config_enter(*dp, tag);
   1130 	}
   1131 	return (error);
   1132 }
   1133 
   1134 void
   1135 dsl_pool_rele(dsl_pool_t *dp, void *tag)
   1136 {
   1137 	dsl_pool_config_exit(dp, tag);
   1138 	spa_close(dp->dp_spa, tag);
   1139 }
   1140 
   1141 void
   1142 dsl_pool_config_enter(dsl_pool_t *dp, void *tag)
   1143 {
   1144 	/*
   1145 	 * We use a "reentrant" reader-writer lock, but not reentrantly.
   1146 	 *
   1147 	 * The rrwlock can (with the track_all flag) track all reading threads,
   1148 	 * which is very useful for debugging which code path failed to release
   1149 	 * the lock, and for verifying that the *current* thread does hold
   1150 	 * the lock.
   1151 	 *
   1152 	 * (Unlike a rwlock, which knows that N threads hold it for
   1153 	 * read, but not *which* threads, so rw_held(RW_READER) returns TRUE
   1154 	 * if any thread holds it for read, even if this thread doesn't).
   1155 	 */
   1156 	ASSERT(!rrw_held(&dp->dp_config_rwlock, RW_READER));
   1157 	rrw_enter(&dp->dp_config_rwlock, RW_READER, tag);
   1158 }
   1159 
   1160 void
   1161 dsl_pool_config_enter_prio(dsl_pool_t *dp, void *tag)
   1162 {
   1163 	ASSERT(!rrw_held(&dp->dp_config_rwlock, RW_READER));
   1164 	rrw_enter_read_prio(&dp->dp_config_rwlock, tag);
   1165 }
   1166 
   1167 void
   1168 dsl_pool_config_exit(dsl_pool_t *dp, void *tag)
   1169 {
   1170 	rrw_exit(&dp->dp_config_rwlock, tag);
   1171 }
   1172 
   1173 boolean_t
   1174 dsl_pool_config_held(dsl_pool_t *dp)
   1175 {
   1176 	return (RRW_LOCK_HELD(&dp->dp_config_rwlock));
   1177 }
   1178 
   1179 boolean_t
   1180 dsl_pool_config_held_writer(dsl_pool_t *dp)
   1181 {
   1182 	return (RRW_WRITE_HELD(&dp->dp_config_rwlock));
   1183 }
   1184