Home | History | Annotate | Line # | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
     23  * Copyright (c) 2011, 2016 by Delphix. All rights reserved.
     24  * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
     25  * Copyright (c) 2014 Integros [integros.com]
     26  */
     27 
     28 #include <sys/sysmacros.h>
     29 #include <sys/zfs_context.h>
     30 #include <sys/fm/fs/zfs.h>
     31 #include <sys/spa.h>
     32 #include <sys/txg.h>
     33 #include <sys/spa_impl.h>
     34 #include <sys/vdev_impl.h>
     35 #include <sys/zio_impl.h>
     36 #include <sys/zio_compress.h>
     37 #include <sys/zio_checksum.h>
     38 #include <sys/dmu_objset.h>
     39 #include <sys/arc.h>
     40 #include <sys/ddt.h>
     41 #include <sys/trim_map.h>
     42 #include <sys/blkptr.h>
     43 #include <sys/zfeature.h>
     44 #include <sys/metaslab_impl.h>
     45 
     46 SYSCTL_DECL(_vfs_zfs);
     47 SYSCTL_NODE(_vfs_zfs, OID_AUTO, zio, CTLFLAG_RW, 0, "ZFS ZIO");
     48 #ifdef __NetBSD__
     49 const int zio_use_uma = 1;
     50 #else
     51 #if defined(__amd64__)
     52 static int zio_use_uma = 1;
     53 #else
     54 static int zio_use_uma = 0;
     55 #endif
     56 #endif
     57 SYSCTL_INT(_vfs_zfs_zio, OID_AUTO, use_uma, CTLFLAG_RDTUN, &zio_use_uma, 0,
     58     "Use uma(9) for ZIO allocations");
     59 static int zio_exclude_metadata = 0;
     60 SYSCTL_INT(_vfs_zfs_zio, OID_AUTO, exclude_metadata, CTLFLAG_RDTUN, &zio_exclude_metadata, 0,
     61     "Exclude metadata buffers from dumps as well");
     62 
     63 zio_trim_stats_t zio_trim_stats = {
     64 	{ "bytes",		KSTAT_DATA_UINT64,
     65 	  "Number of bytes successfully TRIMmed" },
     66 	{ "success",		KSTAT_DATA_UINT64,
     67 	  "Number of successful TRIM requests" },
     68 	{ "unsupported",	KSTAT_DATA_UINT64,
     69 	  "Number of TRIM requests that failed because TRIM is not supported" },
     70 	{ "failed",		KSTAT_DATA_UINT64,
     71 	  "Number of TRIM requests that failed for reasons other than not supported" },
     72 };
     73 
     74 static kstat_t *zio_trim_ksp;
     75 
     76 /*
     77  * ==========================================================================
     78  * I/O type descriptions
     79  * ==========================================================================
     80  */
     81 const char *zio_type_name[ZIO_TYPES] = {
     82 	"zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
     83 	"zio_ioctl"
     84 };
     85 
     86 boolean_t zio_dva_throttle_enabled = B_TRUE;
     87 SYSCTL_INT(_vfs_zfs_zio, OID_AUTO, dva_throttle_enabled, CTLFLAG_RDTUN,
     88     &zio_dva_throttle_enabled, 0, "");
     89 
     90 /*
     91  * ==========================================================================
     92  * I/O kmem caches
     93  * ==========================================================================
     94  */
     95 kmem_cache_t *zio_cache;
     96 kmem_cache_t *zio_link_cache;
     97 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
     98 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
     99 
    100 #ifdef _KERNEL
    101 extern vmem_t *zio_alloc_arena;
    102 #endif
    103 
    104 #define	ZIO_PIPELINE_CONTINUE		0x100
    105 #define	ZIO_PIPELINE_STOP		0x101
    106 
    107 #define	BP_SPANB(indblkshift, level) \
    108 	(((uint64_t)1) << ((level) * ((indblkshift) - SPA_BLKPTRSHIFT)))
    109 #define	COMPARE_META_LEVEL	0x80000000ul
    110 /*
    111  * The following actions directly effect the spa's sync-to-convergence logic.
    112  * The values below define the sync pass when we start performing the action.
    113  * Care should be taken when changing these values as they directly impact
    114  * spa_sync() performance. Tuning these values may introduce subtle performance
    115  * pathologies and should only be done in the context of performance analysis.
    116  * These tunables will eventually be removed and replaced with #defines once
    117  * enough analysis has been done to determine optimal values.
    118  *
    119  * The 'zfs_sync_pass_deferred_free' pass must be greater than 1 to ensure that
    120  * regular blocks are not deferred.
    121  */
    122 int zfs_sync_pass_deferred_free = 2; /* defer frees starting in this pass */
    123 SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_deferred_free, CTLFLAG_RDTUN,
    124     &zfs_sync_pass_deferred_free, 0, "defer frees starting in this pass");
    125 int zfs_sync_pass_dont_compress = 5; /* don't compress starting in this pass */
    126 SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_dont_compress, CTLFLAG_RDTUN,
    127     &zfs_sync_pass_dont_compress, 0, "don't compress starting in this pass");
    128 int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
    129 SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_rewrite, CTLFLAG_RDTUN,
    130     &zfs_sync_pass_rewrite, 0, "rewrite new bps starting in this pass");
    131 
    132 /*
    133  * An allocating zio is one that either currently has the DVA allocate
    134  * stage set or will have it later in its lifetime.
    135  */
    136 #define	IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
    137 
    138 boolean_t	zio_requeue_io_start_cut_in_line = B_TRUE;
    139 
    140 #ifdef illumos
    141 #ifdef ZFS_DEBUG
    142 int zio_buf_debug_limit = 16384;
    143 #else
    144 int zio_buf_debug_limit = 0;
    145 #endif
    146 #endif
    147 
    148 static void zio_taskq_dispatch(zio_t *, zio_taskq_type_t, boolean_t);
    149 
    150 void
    151 zio_init(void)
    152 {
    153 	size_t c;
    154 	zio_cache = kmem_cache_create("zio_cache",
    155 	    sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
    156 	zio_link_cache = kmem_cache_create("zio_link_cache",
    157 	    sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
    158 
    159 	if (!zio_use_uma)
    160 		goto out;
    161 
    162 	/*
    163 	 * For small buffers, we want a cache for each multiple of
    164 	 * SPA_MINBLOCKSIZE.  For larger buffers, we want a cache
    165 	 * for each quarter-power of 2.
    166 	 */
    167 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
    168 		size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
    169 		size_t p2 = size;
    170 		size_t align = 0;
    171 		int cflags = zio_exclude_metadata ? KMC_NODEBUG : 0;
    172 
    173 		while (!ISP2(p2))
    174 			p2 &= p2 - 1;
    175 
    176 #ifdef illumos
    177 #ifndef _KERNEL
    178 		/*
    179 		 * If we are using watchpoints, put each buffer on its own page,
    180 		 * to eliminate the performance overhead of trapping to the
    181 		 * kernel when modifying a non-watched buffer that shares the
    182 		 * page with a watched buffer.
    183 		 */
    184 		if (arc_watch && !IS_P2ALIGNED(size, PAGESIZE))
    185 			continue;
    186 #endif
    187 #endif /* illumos */
    188 		if (size <= 4 * SPA_MINBLOCKSIZE) {
    189 			align = SPA_MINBLOCKSIZE;
    190 		} else if (IS_P2ALIGNED(size, p2 >> 2)) {
    191 			align = MIN(p2 >> 2, PAGESIZE);
    192 		}
    193 
    194 		if (align != 0) {
    195 			char name[36];
    196 			(void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
    197 			zio_buf_cache[c] = kmem_cache_create(name, size,
    198 			    align, NULL, NULL, NULL, NULL, NULL, cflags);
    199 
    200 			/*
    201 			 * Since zio_data bufs do not appear in crash dumps, we
    202 			 * pass KMC_NOTOUCH so that no allocator metadata is
    203 			 * stored with the buffers.
    204 			 */
    205 			(void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
    206 			zio_data_buf_cache[c] = kmem_cache_create(name, size,
    207 			    align, NULL, NULL, NULL, NULL, NULL,
    208 			    cflags | KMC_NOTOUCH | KMC_NODEBUG);
    209 		}
    210 	}
    211 
    212 	while (--c != 0) {
    213 		ASSERT(zio_buf_cache[c] != NULL);
    214 		if (zio_buf_cache[c - 1] == NULL)
    215 			zio_buf_cache[c - 1] = zio_buf_cache[c];
    216 
    217 		ASSERT(zio_data_buf_cache[c] != NULL);
    218 		if (zio_data_buf_cache[c - 1] == NULL)
    219 			zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
    220 	}
    221 out:
    222 
    223 	zio_inject_init();
    224 
    225 	zio_trim_ksp = kstat_create("zfs", 0, "zio_trim", "misc",
    226 	    KSTAT_TYPE_NAMED,
    227 	    sizeof(zio_trim_stats) / sizeof(kstat_named_t),
    228 	    KSTAT_FLAG_VIRTUAL);
    229 
    230 	if (zio_trim_ksp != NULL) {
    231 		zio_trim_ksp->ks_data = &zio_trim_stats;
    232 		kstat_install(zio_trim_ksp);
    233 	}
    234 }
    235 
    236 void
    237 zio_fini(void)
    238 {
    239 	size_t c;
    240 	kmem_cache_t *last_cache = NULL;
    241 	kmem_cache_t *last_data_cache = NULL;
    242 
    243 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
    244 		if (zio_buf_cache[c] != last_cache) {
    245 			last_cache = zio_buf_cache[c];
    246 			kmem_cache_destroy(zio_buf_cache[c]);
    247 		}
    248 		zio_buf_cache[c] = NULL;
    249 
    250 		if (zio_data_buf_cache[c] != last_data_cache) {
    251 			last_data_cache = zio_data_buf_cache[c];
    252 			kmem_cache_destroy(zio_data_buf_cache[c]);
    253 		}
    254 		zio_data_buf_cache[c] = NULL;
    255 	}
    256 
    257 	kmem_cache_destroy(zio_link_cache);
    258 	kmem_cache_destroy(zio_cache);
    259 
    260 	zio_inject_fini();
    261 
    262 	if (zio_trim_ksp != NULL) {
    263 		kstat_delete(zio_trim_ksp);
    264 		zio_trim_ksp = NULL;
    265 	}
    266 }
    267 
    268 /*
    269  * ==========================================================================
    270  * Allocate and free I/O buffers
    271  * ==========================================================================
    272  */
    273 
    274 /*
    275  * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
    276  * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
    277  * useful to inspect ZFS metadata, but if possible, we should avoid keeping
    278  * excess / transient data in-core during a crashdump.
    279  */
    280 static void *
    281 zio_buf_alloc_impl(size_t size, boolean_t canwait)
    282 {
    283 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
    284 	int flags = zio_exclude_metadata ? KM_NODEBUG : 0;
    285 
    286 	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
    287 
    288 	if (zio_use_uma) {
    289 		return (kmem_cache_alloc(zio_buf_cache[c],
    290 		    canwait ? KM_PUSHPAGE : KM_NOSLEEP));
    291 	} else {
    292 		return (kmem_alloc(size,
    293 		    (canwait ? KM_SLEEP : KM_NOSLEEP) | flags));
    294 	}
    295 }
    296 
    297 void *
    298 zio_buf_alloc(size_t size)
    299 {
    300 	return (zio_buf_alloc_impl(size, B_TRUE));
    301 }
    302 
    303 void *
    304 zio_buf_alloc_nowait(size_t size)
    305 {
    306 	return (zio_buf_alloc_impl(size, B_FALSE));
    307 }
    308 
    309 /*
    310  * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
    311  * crashdump if the kernel panics.  This exists so that we will limit the amount
    312  * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
    313  * of kernel heap dumped to disk when the kernel panics)
    314  */
    315 void *
    316 zio_data_buf_alloc(size_t size)
    317 {
    318 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
    319 
    320 	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
    321 
    322 	if (zio_use_uma)
    323 		return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
    324 	else
    325 		return (kmem_alloc(size, KM_SLEEP | KM_NODEBUG));
    326 }
    327 
    328 void
    329 zio_buf_free(void *buf, size_t size)
    330 {
    331 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
    332 
    333 	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
    334 
    335 	if (zio_use_uma)
    336 		kmem_cache_free(zio_buf_cache[c], buf);
    337 	else
    338 		kmem_free(buf, size);
    339 }
    340 
    341 void
    342 zio_data_buf_free(void *buf, size_t size)
    343 {
    344 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
    345 
    346 	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
    347 
    348 	if (zio_use_uma)
    349 		kmem_cache_free(zio_data_buf_cache[c], buf);
    350 	else
    351 		kmem_free(buf, size);
    352 }
    353 
    354 /*
    355  * ==========================================================================
    356  * Push and pop I/O transform buffers
    357  * ==========================================================================
    358  */
    359 void
    360 zio_push_transform(zio_t *zio, void *data, uint64_t size, uint64_t bufsize,
    361     zio_transform_func_t *transform)
    362 {
    363 	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
    364 
    365 	zt->zt_orig_data = zio->io_data;
    366 	zt->zt_orig_size = zio->io_size;
    367 	zt->zt_bufsize = bufsize;
    368 	zt->zt_transform = transform;
    369 
    370 	zt->zt_next = zio->io_transform_stack;
    371 	zio->io_transform_stack = zt;
    372 
    373 	zio->io_data = data;
    374 	zio->io_size = size;
    375 }
    376 
    377 void
    378 zio_pop_transforms(zio_t *zio)
    379 {
    380 	zio_transform_t *zt;
    381 
    382 	while ((zt = zio->io_transform_stack) != NULL) {
    383 		if (zt->zt_transform != NULL)
    384 			zt->zt_transform(zio,
    385 			    zt->zt_orig_data, zt->zt_orig_size);
    386 
    387 		if (zt->zt_bufsize != 0)
    388 			zio_buf_free(zio->io_data, zt->zt_bufsize);
    389 
    390 		zio->io_data = zt->zt_orig_data;
    391 		zio->io_size = zt->zt_orig_size;
    392 		zio->io_transform_stack = zt->zt_next;
    393 
    394 		kmem_free(zt, sizeof (zio_transform_t));
    395 	}
    396 }
    397 
    398 /*
    399  * ==========================================================================
    400  * I/O transform callbacks for subblocks and decompression
    401  * ==========================================================================
    402  */
    403 static void
    404 zio_subblock(zio_t *zio, void *data, uint64_t size)
    405 {
    406 	ASSERT(zio->io_size > size);
    407 
    408 	if (zio->io_type == ZIO_TYPE_READ)
    409 		bcopy(zio->io_data, data, size);
    410 }
    411 
    412 static void
    413 zio_decompress(zio_t *zio, void *data, uint64_t size)
    414 {
    415 	if (zio->io_error == 0 &&
    416 	    zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
    417 	    zio->io_data, data, zio->io_size, size) != 0)
    418 		zio->io_error = SET_ERROR(EIO);
    419 }
    420 
    421 /*
    422  * ==========================================================================
    423  * I/O parent/child relationships and pipeline interlocks
    424  * ==========================================================================
    425  */
    426 zio_t *
    427 zio_walk_parents(zio_t *cio, zio_link_t **zl)
    428 {
    429 	list_t *pl = &cio->io_parent_list;
    430 
    431 	*zl = (*zl == NULL) ? list_head(pl) : list_next(pl, *zl);
    432 	if (*zl == NULL)
    433 		return (NULL);
    434 
    435 	ASSERT((*zl)->zl_child == cio);
    436 	return ((*zl)->zl_parent);
    437 }
    438 
    439 zio_t *
    440 zio_walk_children(zio_t *pio, zio_link_t **zl)
    441 {
    442 	list_t *cl = &pio->io_child_list;
    443 
    444 	*zl = (*zl == NULL) ? list_head(cl) : list_next(cl, *zl);
    445 	if (*zl == NULL)
    446 		return (NULL);
    447 
    448 	ASSERT((*zl)->zl_parent == pio);
    449 	return ((*zl)->zl_child);
    450 }
    451 
    452 zio_t *
    453 zio_unique_parent(zio_t *cio)
    454 {
    455 	zio_link_t *zl = NULL;
    456 	zio_t *pio = zio_walk_parents(cio, &zl);
    457 
    458 	VERIFY3P(zio_walk_parents(cio, &zl), ==, NULL);
    459 	return (pio);
    460 }
    461 
    462 void
    463 zio_add_child(zio_t *pio, zio_t *cio)
    464 {
    465 	zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
    466 
    467 	/*
    468 	 * Logical I/Os can have logical, gang, or vdev children.
    469 	 * Gang I/Os can have gang or vdev children.
    470 	 * Vdev I/Os can only have vdev children.
    471 	 * The following ASSERT captures all of these constraints.
    472 	 */
    473 	ASSERT(cio->io_child_type <= pio->io_child_type);
    474 
    475 	zl->zl_parent = pio;
    476 	zl->zl_child = cio;
    477 
    478 	mutex_enter(&cio->io_lock);
    479 	mutex_enter(&pio->io_lock);
    480 
    481 	ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
    482 
    483 	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
    484 		pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
    485 
    486 	list_insert_head(&pio->io_child_list, zl);
    487 	list_insert_head(&cio->io_parent_list, zl);
    488 
    489 	pio->io_child_count++;
    490 	cio->io_parent_count++;
    491 
    492 	mutex_exit(&pio->io_lock);
    493 	mutex_exit(&cio->io_lock);
    494 }
    495 
    496 static void
    497 zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
    498 {
    499 	ASSERT(zl->zl_parent == pio);
    500 	ASSERT(zl->zl_child == cio);
    501 
    502 	mutex_enter(&cio->io_lock);
    503 	mutex_enter(&pio->io_lock);
    504 
    505 	list_remove(&pio->io_child_list, zl);
    506 	list_remove(&cio->io_parent_list, zl);
    507 
    508 	pio->io_child_count--;
    509 	cio->io_parent_count--;
    510 
    511 	mutex_exit(&pio->io_lock);
    512 	mutex_exit(&cio->io_lock);
    513 
    514 	kmem_cache_free(zio_link_cache, zl);
    515 }
    516 
    517 static boolean_t
    518 zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
    519 {
    520 	uint64_t *countp = &zio->io_children[child][wait];
    521 	boolean_t waiting = B_FALSE;
    522 
    523 	mutex_enter(&zio->io_lock);
    524 	ASSERT(zio->io_stall == NULL);
    525 	if (*countp != 0) {
    526 		zio->io_stage >>= 1;
    527 		ASSERT3U(zio->io_stage, !=, ZIO_STAGE_OPEN);
    528 		zio->io_stall = countp;
    529 		waiting = B_TRUE;
    530 	}
    531 	mutex_exit(&zio->io_lock);
    532 
    533 	return (waiting);
    534 }
    535 
    536 static void
    537 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
    538 {
    539 	uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
    540 	int *errorp = &pio->io_child_error[zio->io_child_type];
    541 
    542 	mutex_enter(&pio->io_lock);
    543 	if (zio->io_error && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
    544 		*errorp = zio_worst_error(*errorp, zio->io_error);
    545 	pio->io_reexecute |= zio->io_reexecute;
    546 	ASSERT3U(*countp, >, 0);
    547 
    548 	(*countp)--;
    549 
    550 	if (*countp == 0 && pio->io_stall == countp) {
    551 		zio_taskq_type_t type =
    552 		    pio->io_stage < ZIO_STAGE_VDEV_IO_START ? ZIO_TASKQ_ISSUE :
    553 		    ZIO_TASKQ_INTERRUPT;
    554 		pio->io_stall = NULL;
    555 		mutex_exit(&pio->io_lock);
    556 		/*
    557 		 * Dispatch the parent zio in its own taskq so that
    558 		 * the child can continue to make progress. This also
    559 		 * prevents overflowing the stack when we have deeply nested
    560 		 * parent-child relationships.
    561 		 */
    562 		zio_taskq_dispatch(pio, type, B_FALSE);
    563 	} else {
    564 		mutex_exit(&pio->io_lock);
    565 	}
    566 }
    567 
    568 static void
    569 zio_inherit_child_errors(zio_t *zio, enum zio_child c)
    570 {
    571 	if (zio->io_child_error[c] != 0 && zio->io_error == 0)
    572 		zio->io_error = zio->io_child_error[c];
    573 }
    574 
    575 int
    576 zio_timestamp_compare(const void *x1, const void *x2)
    577 {
    578 	const zio_t *z1 = x1;
    579 	const zio_t *z2 = x2;
    580 
    581 	if (z1->io_queued_timestamp < z2->io_queued_timestamp)
    582 		return (-1);
    583 	if (z1->io_queued_timestamp > z2->io_queued_timestamp)
    584 		return (1);
    585 
    586 	if (z1->io_bookmark.zb_objset < z2->io_bookmark.zb_objset)
    587 		return (-1);
    588 	if (z1->io_bookmark.zb_objset > z2->io_bookmark.zb_objset)
    589 		return (1);
    590 
    591 	if (z1->io_bookmark.zb_object < z2->io_bookmark.zb_object)
    592 		return (-1);
    593 	if (z1->io_bookmark.zb_object > z2->io_bookmark.zb_object)
    594 		return (1);
    595 
    596 	if (z1->io_bookmark.zb_level < z2->io_bookmark.zb_level)
    597 		return (-1);
    598 	if (z1->io_bookmark.zb_level > z2->io_bookmark.zb_level)
    599 		return (1);
    600 
    601 	if (z1->io_bookmark.zb_blkid < z2->io_bookmark.zb_blkid)
    602 		return (-1);
    603 	if (z1->io_bookmark.zb_blkid > z2->io_bookmark.zb_blkid)
    604 		return (1);
    605 
    606 	if (z1 < z2)
    607 		return (-1);
    608 	if (z1 > z2)
    609 		return (1);
    610 
    611 	return (0);
    612 }
    613 
    614 /*
    615  * ==========================================================================
    616  * Create the various types of I/O (read, write, free, etc)
    617  * ==========================================================================
    618  */
    619 static zio_t *
    620 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
    621     void *data, uint64_t size, zio_done_func_t *done, void *private,
    622     zio_type_t type, zio_priority_t priority, enum zio_flag flags,
    623     vdev_t *vd, uint64_t offset, const zbookmark_phys_t *zb,
    624     enum zio_stage stage, enum zio_stage pipeline)
    625 {
    626 	zio_t *zio;
    627 
    628 	ASSERT3U(type == ZIO_TYPE_FREE || size, <=, SPA_MAXBLOCKSIZE);
    629 	ASSERT(P2PHASE(size, SPA_MINBLOCKSIZE) == 0);
    630 	ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
    631 
    632 	ASSERT(!vd || spa_config_held(spa, SCL_STATE_ALL, RW_READER));
    633 	ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
    634 	ASSERT(vd || stage == ZIO_STAGE_OPEN);
    635 
    636 	zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
    637 	bzero(zio, sizeof (zio_t));
    638 
    639 	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
    640 	cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
    641 
    642 	list_create(&zio->io_parent_list, sizeof (zio_link_t),
    643 	    offsetof(zio_link_t, zl_parent_node));
    644 	list_create(&zio->io_child_list, sizeof (zio_link_t),
    645 	    offsetof(zio_link_t, zl_child_node));
    646 	metaslab_trace_init(&zio->io_alloc_list);
    647 
    648 	if (vd != NULL)
    649 		zio->io_child_type = ZIO_CHILD_VDEV;
    650 	else if (flags & ZIO_FLAG_GANG_CHILD)
    651 		zio->io_child_type = ZIO_CHILD_GANG;
    652 	else if (flags & ZIO_FLAG_DDT_CHILD)
    653 		zio->io_child_type = ZIO_CHILD_DDT;
    654 	else
    655 		zio->io_child_type = ZIO_CHILD_LOGICAL;
    656 
    657 	if (bp != NULL) {
    658 		zio->io_bp = (blkptr_t *)bp;
    659 		zio->io_bp_copy = *bp;
    660 		zio->io_bp_orig = *bp;
    661 		if (type != ZIO_TYPE_WRITE ||
    662 		    zio->io_child_type == ZIO_CHILD_DDT)
    663 			zio->io_bp = &zio->io_bp_copy;	/* so caller can free */
    664 		if (zio->io_child_type == ZIO_CHILD_LOGICAL)
    665 			zio->io_logical = zio;
    666 		if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
    667 			pipeline |= ZIO_GANG_STAGES;
    668 	}
    669 
    670 	zio->io_spa = spa;
    671 	zio->io_txg = txg;
    672 	zio->io_done = done;
    673 	zio->io_private = private;
    674 	zio->io_type = type;
    675 	zio->io_priority = priority;
    676 	zio->io_vd = vd;
    677 	zio->io_offset = offset;
    678 	zio->io_orig_data = zio->io_data = data;
    679 	zio->io_orig_size = zio->io_size = size;
    680 	zio->io_orig_flags = zio->io_flags = flags;
    681 	zio->io_orig_stage = zio->io_stage = stage;
    682 	zio->io_orig_pipeline = zio->io_pipeline = pipeline;
    683 	zio->io_pipeline_trace = ZIO_STAGE_OPEN;
    684 
    685 	zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
    686 	zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
    687 
    688 	if (zb != NULL)
    689 		zio->io_bookmark = *zb;
    690 
    691 	if (pio != NULL) {
    692 		if (zio->io_logical == NULL)
    693 			zio->io_logical = pio->io_logical;
    694 		if (zio->io_child_type == ZIO_CHILD_GANG)
    695 			zio->io_gang_leader = pio->io_gang_leader;
    696 		zio_add_child(pio, zio);
    697 	}
    698 
    699 	return (zio);
    700 }
    701 
    702 static void
    703 zio_destroy(zio_t *zio)
    704 {
    705 	metaslab_trace_fini(&zio->io_alloc_list);
    706 	list_destroy(&zio->io_parent_list);
    707 	list_destroy(&zio->io_child_list);
    708 	mutex_destroy(&zio->io_lock);
    709 	cv_destroy(&zio->io_cv);
    710 	kmem_cache_free(zio_cache, zio);
    711 }
    712 
    713 zio_t *
    714 zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
    715     void *private, enum zio_flag flags)
    716 {
    717 	zio_t *zio;
    718 
    719 	zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
    720 	    ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
    721 	    ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
    722 
    723 	return (zio);
    724 }
    725 
    726 zio_t *
    727 zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
    728 {
    729 	return (zio_null(NULL, spa, NULL, done, private, flags));
    730 }
    731 
    732 void
    733 zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp)
    734 {
    735 	if (!DMU_OT_IS_VALID(BP_GET_TYPE(bp))) {
    736 		zfs_panic_recover("blkptr at %p has invalid TYPE %llu",
    737 		    bp, (longlong_t)BP_GET_TYPE(bp));
    738 	}
    739 	if (BP_GET_CHECKSUM(bp) >= ZIO_CHECKSUM_FUNCTIONS ||
    740 	    BP_GET_CHECKSUM(bp) <= ZIO_CHECKSUM_ON) {
    741 		zfs_panic_recover("blkptr at %p has invalid CHECKSUM %llu",
    742 		    bp, (longlong_t)BP_GET_CHECKSUM(bp));
    743 	}
    744 	if (BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_FUNCTIONS ||
    745 	    BP_GET_COMPRESS(bp) <= ZIO_COMPRESS_ON) {
    746 		zfs_panic_recover("blkptr at %p has invalid COMPRESS %llu",
    747 		    bp, (longlong_t)BP_GET_COMPRESS(bp));
    748 	}
    749 	if (BP_GET_LSIZE(bp) > SPA_MAXBLOCKSIZE) {
    750 		zfs_panic_recover("blkptr at %p has invalid LSIZE %llu",
    751 		    bp, (longlong_t)BP_GET_LSIZE(bp));
    752 	}
    753 	if (BP_GET_PSIZE(bp) > SPA_MAXBLOCKSIZE) {
    754 		zfs_panic_recover("blkptr at %p has invalid PSIZE %llu",
    755 		    bp, (longlong_t)BP_GET_PSIZE(bp));
    756 	}
    757 
    758 	if (BP_IS_EMBEDDED(bp)) {
    759 		if (BPE_GET_ETYPE(bp) > NUM_BP_EMBEDDED_TYPES) {
    760 			zfs_panic_recover("blkptr at %p has invalid ETYPE %llu",
    761 			    bp, (longlong_t)BPE_GET_ETYPE(bp));
    762 		}
    763 	}
    764 
    765 	/*
    766 	 * Pool-specific checks.
    767 	 *
    768 	 * Note: it would be nice to verify that the blk_birth and
    769 	 * BP_PHYSICAL_BIRTH() are not too large.  However, spa_freeze()
    770 	 * allows the birth time of log blocks (and dmu_sync()-ed blocks
    771 	 * that are in the log) to be arbitrarily large.
    772 	 */
    773 	for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
    774 		uint64_t vdevid = DVA_GET_VDEV(&bp->blk_dva[i]);
    775 		if (vdevid >= spa->spa_root_vdev->vdev_children) {
    776 			zfs_panic_recover("blkptr at %p DVA %u has invalid "
    777 			    "VDEV %llu",
    778 			    bp, i, (longlong_t)vdevid);
    779 			continue;
    780 		}
    781 		vdev_t *vd = spa->spa_root_vdev->vdev_child[vdevid];
    782 		if (vd == NULL) {
    783 			zfs_panic_recover("blkptr at %p DVA %u has invalid "
    784 			    "VDEV %llu",
    785 			    bp, i, (longlong_t)vdevid);
    786 			continue;
    787 		}
    788 		if (vd->vdev_ops == &vdev_hole_ops) {
    789 			zfs_panic_recover("blkptr at %p DVA %u has hole "
    790 			    "VDEV %llu",
    791 			    bp, i, (longlong_t)vdevid);
    792 			continue;
    793 		}
    794 		if (vd->vdev_ops == &vdev_missing_ops) {
    795 			/*
    796 			 * "missing" vdevs are valid during import, but we
    797 			 * don't have their detailed info (e.g. asize), so
    798 			 * we can't perform any more checks on them.
    799 			 */
    800 			continue;
    801 		}
    802 		uint64_t offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
    803 		uint64_t asize = DVA_GET_ASIZE(&bp->blk_dva[i]);
    804 		if (BP_IS_GANG(bp))
    805 			asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
    806 		if (offset + asize > vd->vdev_asize) {
    807 			zfs_panic_recover("blkptr at %p DVA %u has invalid "
    808 			    "OFFSET %llu",
    809 			    bp, i, (longlong_t)offset);
    810 		}
    811 	}
    812 }
    813 
    814 zio_t *
    815 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
    816     void *data, uint64_t size, zio_done_func_t *done, void *private,
    817     zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
    818 {
    819 	zio_t *zio;
    820 
    821 	zfs_blkptr_verify(spa, bp);
    822 
    823 	zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
    824 	    data, size, done, private,
    825 	    ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
    826 	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
    827 	    ZIO_DDT_CHILD_READ_PIPELINE : ZIO_READ_PIPELINE);
    828 
    829 	return (zio);
    830 }
    831 
    832 zio_t *
    833 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
    834     void *data, uint64_t size, const zio_prop_t *zp,
    835     zio_done_func_t *ready, zio_done_func_t *children_ready,
    836     zio_done_func_t *physdone, zio_done_func_t *done,
    837     void *private, zio_priority_t priority, enum zio_flag flags,
    838     const zbookmark_phys_t *zb)
    839 {
    840 	zio_t *zio;
    841 
    842 	ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
    843 	    zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
    844 	    zp->zp_compress >= ZIO_COMPRESS_OFF &&
    845 	    zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
    846 	    DMU_OT_IS_VALID(zp->zp_type) &&
    847 	    zp->zp_level < 32 &&
    848 	    zp->zp_copies > 0 &&
    849 	    zp->zp_copies <= spa_max_replication(spa));
    850 
    851 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
    852 	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
    853 	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
    854 	    ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
    855 
    856 	zio->io_ready = ready;
    857 	zio->io_children_ready = children_ready;
    858 	zio->io_physdone = physdone;
    859 	zio->io_prop = *zp;
    860 
    861 	/*
    862 	 * Data can be NULL if we are going to call zio_write_override() to
    863 	 * provide the already-allocated BP.  But we may need the data to
    864 	 * verify a dedup hit (if requested).  In this case, don't try to
    865 	 * dedup (just take the already-allocated BP verbatim).
    866 	 */
    867 	if (data == NULL && zio->io_prop.zp_dedup_verify) {
    868 		zio->io_prop.zp_dedup = zio->io_prop.zp_dedup_verify = B_FALSE;
    869 	}
    870 
    871 	return (zio);
    872 }
    873 
    874 zio_t *
    875 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
    876     uint64_t size, zio_done_func_t *done, void *private,
    877     zio_priority_t priority, enum zio_flag flags, zbookmark_phys_t *zb)
    878 {
    879 	zio_t *zio;
    880 
    881 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
    882 	    ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_IO_REWRITE, NULL, 0, zb,
    883 	    ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
    884 
    885 	return (zio);
    886 }
    887 
    888 void
    889 zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
    890 {
    891 	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
    892 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
    893 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
    894 	ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
    895 
    896 	/*
    897 	 * We must reset the io_prop to match the values that existed
    898 	 * when the bp was first written by dmu_sync() keeping in mind
    899 	 * that nopwrite and dedup are mutually exclusive.
    900 	 */
    901 	zio->io_prop.zp_dedup = nopwrite ? B_FALSE : zio->io_prop.zp_dedup;
    902 	zio->io_prop.zp_nopwrite = nopwrite;
    903 	zio->io_prop.zp_copies = copies;
    904 	zio->io_bp_override = bp;
    905 }
    906 
    907 void
    908 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
    909 {
    910 
    911 	/*
    912 	 * The check for EMBEDDED is a performance optimization.  We
    913 	 * process the free here (by ignoring it) rather than
    914 	 * putting it on the list and then processing it in zio_free_sync().
    915 	 */
    916 	if (BP_IS_EMBEDDED(bp))
    917 		return;
    918 	metaslab_check_free(spa, bp);
    919 
    920 	/*
    921 	 * Frees that are for the currently-syncing txg, are not going to be
    922 	 * deferred, and which will not need to do a read (i.e. not GANG or
    923 	 * DEDUP), can be processed immediately.  Otherwise, put them on the
    924 	 * in-memory list for later processing.
    925 	 */
    926 	if (zfs_trim_enabled || BP_IS_GANG(bp) || BP_GET_DEDUP(bp) ||
    927 	    txg != spa->spa_syncing_txg ||
    928 	    spa_sync_pass(spa) >= zfs_sync_pass_deferred_free) {
    929 		bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
    930 	} else {
    931 		VERIFY0(zio_wait(zio_free_sync(NULL, spa, txg, bp,
    932 		    BP_GET_PSIZE(bp), 0)));
    933 	}
    934 }
    935 
    936 zio_t *
    937 zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
    938     uint64_t size, enum zio_flag flags)
    939 {
    940 	zio_t *zio;
    941 	enum zio_stage stage = ZIO_FREE_PIPELINE;
    942 
    943 	ASSERT(!BP_IS_HOLE(bp));
    944 	ASSERT(spa_syncing_txg(spa) == txg);
    945 	ASSERT(spa_sync_pass(spa) < zfs_sync_pass_deferred_free);
    946 
    947 	if (BP_IS_EMBEDDED(bp))
    948 		return (zio_null(pio, spa, NULL, NULL, NULL, 0));
    949 
    950 	metaslab_check_free(spa, bp);
    951 	arc_freed(spa, bp);
    952 
    953 	if (zfs_trim_enabled)
    954 		stage |= ZIO_STAGE_ISSUE_ASYNC | ZIO_STAGE_VDEV_IO_START |
    955 		    ZIO_STAGE_VDEV_IO_ASSESS;
    956 	/*
    957 	 * GANG and DEDUP blocks can induce a read (for the gang block header,
    958 	 * or the DDT), so issue them asynchronously so that this thread is
    959 	 * not tied up.
    960 	 */
    961 	else if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp))
    962 		stage |= ZIO_STAGE_ISSUE_ASYNC;
    963 
    964 	flags |= ZIO_FLAG_DONT_QUEUE;
    965 
    966 	zio = zio_create(pio, spa, txg, bp, NULL, size,
    967 	    NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_NOW, flags,
    968 	    NULL, 0, NULL, ZIO_STAGE_OPEN, stage);
    969 
    970 	return (zio);
    971 }
    972 
    973 zio_t *
    974 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
    975     zio_done_func_t *done, void *private, enum zio_flag flags)
    976 {
    977 	zio_t *zio;
    978 
    979 	dprintf_bp(bp, "claiming in txg %llu", txg);
    980 
    981 	if (BP_IS_EMBEDDED(bp))
    982 		return (zio_null(pio, spa, NULL, NULL, NULL, 0));
    983 
    984 	/*
    985 	 * A claim is an allocation of a specific block.  Claims are needed
    986 	 * to support immediate writes in the intent log.  The issue is that
    987 	 * immediate writes contain committed data, but in a txg that was
    988 	 * *not* committed.  Upon opening the pool after an unclean shutdown,
    989 	 * the intent log claims all blocks that contain immediate write data
    990 	 * so that the SPA knows they're in use.
    991 	 *
    992 	 * All claims *must* be resolved in the first txg -- before the SPA
    993 	 * starts allocating blocks -- so that nothing is allocated twice.
    994 	 * If txg == 0 we just verify that the block is claimable.
    995 	 */
    996 	ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <, spa_first_txg(spa));
    997 	ASSERT(txg == spa_first_txg(spa) || txg == 0);
    998 	ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));	/* zdb(1M) */
    999 
   1000 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
   1001 	    done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW, flags,
   1002 	    NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
   1003 	ASSERT0(zio->io_queued_timestamp);
   1004 
   1005 	return (zio);
   1006 }
   1007 
   1008 zio_t *
   1009 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd, uint64_t offset,
   1010     uint64_t size, zio_done_func_t *done, void *private,
   1011     zio_priority_t priority, enum zio_flag flags)
   1012 {
   1013 	zio_t *zio;
   1014 	int c;
   1015 
   1016 	if (vd->vdev_children == 0) {
   1017 		zio = zio_create(pio, spa, 0, NULL, NULL, size, done, private,
   1018 		    ZIO_TYPE_IOCTL, priority, flags, vd, offset, NULL,
   1019 		    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
   1020 
   1021 		zio->io_cmd = cmd;
   1022 	} else {
   1023 		zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
   1024 
   1025 		for (c = 0; c < vd->vdev_children; c++)
   1026 			zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
   1027 			    offset, size, done, private, priority, flags));
   1028 	}
   1029 
   1030 	return (zio);
   1031 }
   1032 
   1033 zio_t *
   1034 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
   1035     void *data, int checksum, zio_done_func_t *done, void *private,
   1036     zio_priority_t priority, enum zio_flag flags, boolean_t labels)
   1037 {
   1038 	zio_t *zio;
   1039 
   1040 	ASSERT(vd->vdev_children == 0);
   1041 	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
   1042 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
   1043 	ASSERT3U(offset + size, <=, vd->vdev_psize);
   1044 
   1045 	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
   1046 	    ZIO_TYPE_READ, priority, flags | ZIO_FLAG_PHYSICAL, vd, offset,
   1047 	    NULL, ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
   1048 
   1049 	zio->io_prop.zp_checksum = checksum;
   1050 
   1051 	return (zio);
   1052 }
   1053 
   1054 zio_t *
   1055 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
   1056     void *data, int checksum, zio_done_func_t *done, void *private,
   1057     zio_priority_t priority, enum zio_flag flags, boolean_t labels)
   1058 {
   1059 	zio_t *zio;
   1060 
   1061 	ASSERT(vd->vdev_children == 0);
   1062 	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
   1063 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
   1064 	ASSERT3U(offset + size, <=, vd->vdev_psize);
   1065 
   1066 	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
   1067 	    ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_PHYSICAL, vd, offset,
   1068 	    NULL, ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
   1069 
   1070 	zio->io_prop.zp_checksum = checksum;
   1071 
   1072 	if (zio_checksum_table[checksum].ci_flags & ZCHECKSUM_FLAG_EMBEDDED) {
   1073 		/*
   1074 		 * zec checksums are necessarily destructive -- they modify
   1075 		 * the end of the write buffer to hold the verifier/checksum.
   1076 		 * Therefore, we must make a local copy in case the data is
   1077 		 * being written to multiple places in parallel.
   1078 		 */
   1079 		void *wbuf = zio_buf_alloc(size);
   1080 		bcopy(data, wbuf, size);
   1081 		zio_push_transform(zio, wbuf, size, size, NULL);
   1082 	}
   1083 
   1084 	return (zio);
   1085 }
   1086 
   1087 /*
   1088  * Create a child I/O to do some work for us.
   1089  */
   1090 zio_t *
   1091 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
   1092     void *data, uint64_t size, int type, zio_priority_t priority,
   1093     enum zio_flag flags, zio_done_func_t *done, void *private)
   1094 {
   1095 	enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
   1096 	zio_t *zio;
   1097 
   1098 	ASSERT(vd->vdev_parent ==
   1099 	    (pio->io_vd ? pio->io_vd : pio->io_spa->spa_root_vdev));
   1100 
   1101 	if (type == ZIO_TYPE_READ && bp != NULL) {
   1102 		/*
   1103 		 * If we have the bp, then the child should perform the
   1104 		 * checksum and the parent need not.  This pushes error
   1105 		 * detection as close to the leaves as possible and
   1106 		 * eliminates redundant checksums in the interior nodes.
   1107 		 */
   1108 		pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
   1109 		pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
   1110 	}
   1111 
   1112 	/* Not all IO types require vdev io done stage e.g. free */
   1113 	if (!(pio->io_pipeline & ZIO_STAGE_VDEV_IO_DONE))
   1114 		pipeline &= ~ZIO_STAGE_VDEV_IO_DONE;
   1115 
   1116 	if (vd->vdev_children == 0)
   1117 		offset += VDEV_LABEL_START_SIZE;
   1118 
   1119 	flags |= ZIO_VDEV_CHILD_FLAGS(pio) | ZIO_FLAG_DONT_PROPAGATE;
   1120 
   1121 	/*
   1122 	 * If we've decided to do a repair, the write is not speculative --
   1123 	 * even if the original read was.
   1124 	 */
   1125 	if (flags & ZIO_FLAG_IO_REPAIR)
   1126 		flags &= ~ZIO_FLAG_SPECULATIVE;
   1127 
   1128 	/*
   1129 	 * If we're creating a child I/O that is not associated with a
   1130 	 * top-level vdev, then the child zio is not an allocating I/O.
   1131 	 * If this is a retried I/O then we ignore it since we will
   1132 	 * have already processed the original allocating I/O.
   1133 	 */
   1134 	if (flags & ZIO_FLAG_IO_ALLOCATING &&
   1135 	    (vd != vd->vdev_top || (flags & ZIO_FLAG_IO_RETRY))) {
   1136 		metaslab_class_t *mc = spa_normal_class(pio->io_spa);
   1137 
   1138 		ASSERT(mc->mc_alloc_throttle_enabled);
   1139 		ASSERT(type == ZIO_TYPE_WRITE);
   1140 		ASSERT(priority == ZIO_PRIORITY_ASYNC_WRITE);
   1141 		ASSERT(!(flags & ZIO_FLAG_IO_REPAIR));
   1142 		ASSERT(!(pio->io_flags & ZIO_FLAG_IO_REWRITE) ||
   1143 		    pio->io_child_type == ZIO_CHILD_GANG);
   1144 
   1145 		flags &= ~ZIO_FLAG_IO_ALLOCATING;
   1146 	}
   1147 
   1148 	zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size,
   1149 	    done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
   1150 	    ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
   1151 	ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
   1152 
   1153 	zio->io_physdone = pio->io_physdone;
   1154 	if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
   1155 		zio->io_logical->io_phys_children++;
   1156 
   1157 	return (zio);
   1158 }
   1159 
   1160 zio_t *
   1161 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, void *data, uint64_t size,
   1162     int type, zio_priority_t priority, enum zio_flag flags,
   1163     zio_done_func_t *done, void *private)
   1164 {
   1165 	zio_t *zio;
   1166 
   1167 	ASSERT(vd->vdev_ops->vdev_op_leaf);
   1168 
   1169 	zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
   1170 	    data, size, done, private, type, priority,
   1171 	    flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_DELEGATED,
   1172 	    vd, offset, NULL,
   1173 	    ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
   1174 
   1175 	return (zio);
   1176 }
   1177 
   1178 void
   1179 zio_flush(zio_t *zio, vdev_t *vd)
   1180 {
   1181 	zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE, 0, 0,
   1182 	    NULL, NULL, ZIO_PRIORITY_NOW,
   1183 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
   1184 }
   1185 
   1186 zio_t *
   1187 zio_trim(zio_t *zio, spa_t *spa, vdev_t *vd, uint64_t offset, uint64_t size)
   1188 {
   1189 
   1190 	ASSERT(vd->vdev_ops->vdev_op_leaf);
   1191 
   1192 	return (zio_create(zio, spa, 0, NULL, NULL, size, NULL, NULL,
   1193 	    ZIO_TYPE_FREE, ZIO_PRIORITY_TRIM, ZIO_FLAG_DONT_AGGREGATE |
   1194 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY,
   1195 	    vd, offset, NULL, ZIO_STAGE_OPEN, ZIO_FREE_PHYS_PIPELINE));
   1196 }
   1197 
   1198 void
   1199 zio_shrink(zio_t *zio, uint64_t size)
   1200 {
   1201 	ASSERT(zio->io_executor == NULL);
   1202 	ASSERT(zio->io_orig_size == zio->io_size);
   1203 	ASSERT(size <= zio->io_size);
   1204 
   1205 	/*
   1206 	 * We don't shrink for raidz because of problems with the
   1207 	 * reconstruction when reading back less than the block size.
   1208 	 * Note, BP_IS_RAIDZ() assumes no compression.
   1209 	 */
   1210 	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
   1211 	if (!BP_IS_RAIDZ(zio->io_bp))
   1212 		zio->io_orig_size = zio->io_size = size;
   1213 }
   1214 
   1215 /*
   1216  * ==========================================================================
   1217  * Prepare to read and write logical blocks
   1218  * ==========================================================================
   1219  */
   1220 
   1221 static int
   1222 zio_read_bp_init(zio_t *zio)
   1223 {
   1224 	blkptr_t *bp = zio->io_bp;
   1225 
   1226 	if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
   1227 	    zio->io_child_type == ZIO_CHILD_LOGICAL &&
   1228 	    !(zio->io_flags & ZIO_FLAG_RAW)) {
   1229 		uint64_t psize =
   1230 		    BP_IS_EMBEDDED(bp) ? BPE_GET_PSIZE(bp) : BP_GET_PSIZE(bp);
   1231 		void *cbuf = zio_buf_alloc(psize);
   1232 
   1233 		zio_push_transform(zio, cbuf, psize, psize, zio_decompress);
   1234 	}
   1235 
   1236 	if (BP_IS_EMBEDDED(bp) && BPE_GET_ETYPE(bp) == BP_EMBEDDED_TYPE_DATA) {
   1237 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   1238 		decode_embedded_bp_compressed(bp, zio->io_data);
   1239 	} else {
   1240 		ASSERT(!BP_IS_EMBEDDED(bp));
   1241 	}
   1242 
   1243 	if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
   1244 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
   1245 
   1246 	if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
   1247 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
   1248 
   1249 	if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
   1250 		zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
   1251 
   1252 	return (ZIO_PIPELINE_CONTINUE);
   1253 }
   1254 
   1255 static int
   1256 zio_write_bp_init(zio_t *zio)
   1257 {
   1258 	if (!IO_IS_ALLOCATING(zio))
   1259 		return (ZIO_PIPELINE_CONTINUE);
   1260 
   1261 	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
   1262 
   1263 	if (zio->io_bp_override) {
   1264 		blkptr_t *bp = zio->io_bp;
   1265 		zio_prop_t *zp = &zio->io_prop;
   1266 
   1267 		ASSERT(bp->blk_birth != zio->io_txg);
   1268 		ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
   1269 
   1270 		*bp = *zio->io_bp_override;
   1271 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   1272 
   1273 		if (BP_IS_EMBEDDED(bp))
   1274 			return (ZIO_PIPELINE_CONTINUE);
   1275 
   1276 		/*
   1277 		 * If we've been overridden and nopwrite is set then
   1278 		 * set the flag accordingly to indicate that a nopwrite
   1279 		 * has already occurred.
   1280 		 */
   1281 		if (!BP_IS_HOLE(bp) && zp->zp_nopwrite) {
   1282 			ASSERT(!zp->zp_dedup);
   1283 			ASSERT3U(BP_GET_CHECKSUM(bp), ==, zp->zp_checksum);
   1284 			zio->io_flags |= ZIO_FLAG_NOPWRITE;
   1285 			return (ZIO_PIPELINE_CONTINUE);
   1286 		}
   1287 
   1288 		ASSERT(!zp->zp_nopwrite);
   1289 
   1290 		if (BP_IS_HOLE(bp) || !zp->zp_dedup)
   1291 			return (ZIO_PIPELINE_CONTINUE);
   1292 
   1293 		ASSERT((zio_checksum_table[zp->zp_checksum].ci_flags &
   1294 		    ZCHECKSUM_FLAG_DEDUP) || zp->zp_dedup_verify);
   1295 
   1296 		if (BP_GET_CHECKSUM(bp) == zp->zp_checksum) {
   1297 			BP_SET_DEDUP(bp, 1);
   1298 			zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
   1299 			return (ZIO_PIPELINE_CONTINUE);
   1300 		}
   1301 
   1302 		/*
   1303 		 * We were unable to handle this as an override bp, treat
   1304 		 * it as a regular write I/O.
   1305 		 */
   1306 		zio->io_bp_override = NULL;
   1307 		*bp = zio->io_bp_orig;
   1308 		zio->io_pipeline = zio->io_orig_pipeline;
   1309 	}
   1310 
   1311 	return (ZIO_PIPELINE_CONTINUE);
   1312 }
   1313 
   1314 static int
   1315 zio_write_compress(zio_t *zio)
   1316 {
   1317 	spa_t *spa = zio->io_spa;
   1318 	zio_prop_t *zp = &zio->io_prop;
   1319 	enum zio_compress compress = zp->zp_compress;
   1320 	blkptr_t *bp = zio->io_bp;
   1321 	uint64_t lsize = zio->io_size;
   1322 	uint64_t psize = lsize;
   1323 	int pass = 1;
   1324 
   1325 	/*
   1326 	 * If our children haven't all reached the ready stage,
   1327 	 * wait for them and then repeat this pipeline stage.
   1328 	 */
   1329 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
   1330 	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
   1331 		return (ZIO_PIPELINE_STOP);
   1332 
   1333 	if (!IO_IS_ALLOCATING(zio))
   1334 		return (ZIO_PIPELINE_CONTINUE);
   1335 
   1336 	if (zio->io_children_ready != NULL) {
   1337 		/*
   1338 		 * Now that all our children are ready, run the callback
   1339 		 * associated with this zio in case it wants to modify the
   1340 		 * data to be written.
   1341 		 */
   1342 		ASSERT3U(zp->zp_level, >, 0);
   1343 		zio->io_children_ready(zio);
   1344 	}
   1345 
   1346 	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
   1347 	ASSERT(zio->io_bp_override == NULL);
   1348 
   1349 	if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
   1350 		/*
   1351 		 * We're rewriting an existing block, which means we're
   1352 		 * working on behalf of spa_sync().  For spa_sync() to
   1353 		 * converge, it must eventually be the case that we don't
   1354 		 * have to allocate new blocks.  But compression changes
   1355 		 * the blocksize, which forces a reallocate, and makes
   1356 		 * convergence take longer.  Therefore, after the first
   1357 		 * few passes, stop compressing to ensure convergence.
   1358 		 */
   1359 		pass = spa_sync_pass(spa);
   1360 
   1361 		ASSERT(zio->io_txg == spa_syncing_txg(spa));
   1362 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   1363 		ASSERT(!BP_GET_DEDUP(bp));
   1364 
   1365 		if (pass >= zfs_sync_pass_dont_compress)
   1366 			compress = ZIO_COMPRESS_OFF;
   1367 
   1368 		/* Make sure someone doesn't change their mind on overwrites */
   1369 		ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
   1370 		    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
   1371 	}
   1372 
   1373 	if (compress != ZIO_COMPRESS_OFF) {
   1374 		void *cbuf = zio_buf_alloc(lsize);
   1375 		psize = zio_compress_data(compress, zio->io_data, cbuf, lsize);
   1376 		if (psize == 0 || psize == lsize) {
   1377 			compress = ZIO_COMPRESS_OFF;
   1378 			zio_buf_free(cbuf, lsize);
   1379 		} else if (!zp->zp_dedup && psize <= BPE_PAYLOAD_SIZE &&
   1380 		    zp->zp_level == 0 && !DMU_OT_HAS_FILL(zp->zp_type) &&
   1381 		    spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA)) {
   1382 			encode_embedded_bp_compressed(bp,
   1383 			    cbuf, compress, lsize, psize);
   1384 			BPE_SET_ETYPE(bp, BP_EMBEDDED_TYPE_DATA);
   1385 			BP_SET_TYPE(bp, zio->io_prop.zp_type);
   1386 			BP_SET_LEVEL(bp, zio->io_prop.zp_level);
   1387 			zio_buf_free(cbuf, lsize);
   1388 			bp->blk_birth = zio->io_txg;
   1389 			zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   1390 			ASSERT(spa_feature_is_active(spa,
   1391 			    SPA_FEATURE_EMBEDDED_DATA));
   1392 			return (ZIO_PIPELINE_CONTINUE);
   1393 		} else {
   1394 			/*
   1395 			 * Round up compressed size up to the ashift
   1396 			 * of the smallest-ashift device, and zero the tail.
   1397 			 * This ensures that the compressed size of the BP
   1398 			 * (and thus compressratio property) are correct,
   1399 			 * in that we charge for the padding used to fill out
   1400 			 * the last sector.
   1401 			 */
   1402 			ASSERT3U(spa->spa_min_ashift, >=, SPA_MINBLOCKSHIFT);
   1403 			size_t rounded = (size_t)P2ROUNDUP(psize,
   1404 			    1ULL << spa->spa_min_ashift);
   1405 			if (rounded >= lsize) {
   1406 				compress = ZIO_COMPRESS_OFF;
   1407 				zio_buf_free(cbuf, lsize);
   1408 				psize = lsize;
   1409 			} else {
   1410 				bzero((char *)cbuf + psize, rounded - psize);
   1411 				psize = rounded;
   1412 				zio_push_transform(zio, cbuf,
   1413 				    psize, lsize, NULL);
   1414 			}
   1415 		}
   1416 
   1417 		/*
   1418 		 * We were unable to handle this as an override bp, treat
   1419 		 * it as a regular write I/O.
   1420 		 */
   1421 		zio->io_bp_override = NULL;
   1422 		*bp = zio->io_bp_orig;
   1423 		zio->io_pipeline = zio->io_orig_pipeline;
   1424 	}
   1425 
   1426 	/*
   1427 	 * The final pass of spa_sync() must be all rewrites, but the first
   1428 	 * few passes offer a trade-off: allocating blocks defers convergence,
   1429 	 * but newly allocated blocks are sequential, so they can be written
   1430 	 * to disk faster.  Therefore, we allow the first few passes of
   1431 	 * spa_sync() to allocate new blocks, but force rewrites after that.
   1432 	 * There should only be a handful of blocks after pass 1 in any case.
   1433 	 */
   1434 	if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg &&
   1435 	    BP_GET_PSIZE(bp) == psize &&
   1436 	    pass >= zfs_sync_pass_rewrite) {
   1437 		ASSERT(psize != 0);
   1438 		enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
   1439 		zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
   1440 		zio->io_flags |= ZIO_FLAG_IO_REWRITE;
   1441 	} else {
   1442 		BP_ZERO(bp);
   1443 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
   1444 	}
   1445 
   1446 	if (psize == 0) {
   1447 		if (zio->io_bp_orig.blk_birth != 0 &&
   1448 		    spa_feature_is_active(spa, SPA_FEATURE_HOLE_BIRTH)) {
   1449 			BP_SET_LSIZE(bp, lsize);
   1450 			BP_SET_TYPE(bp, zp->zp_type);
   1451 			BP_SET_LEVEL(bp, zp->zp_level);
   1452 			BP_SET_BIRTH(bp, zio->io_txg, 0);
   1453 		}
   1454 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   1455 	} else {
   1456 		ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
   1457 		BP_SET_LSIZE(bp, lsize);
   1458 		BP_SET_TYPE(bp, zp->zp_type);
   1459 		BP_SET_LEVEL(bp, zp->zp_level);
   1460 		BP_SET_PSIZE(bp, psize);
   1461 		BP_SET_COMPRESS(bp, compress);
   1462 		BP_SET_CHECKSUM(bp, zp->zp_checksum);
   1463 		BP_SET_DEDUP(bp, zp->zp_dedup);
   1464 		BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
   1465 		if (zp->zp_dedup) {
   1466 			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   1467 			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
   1468 			zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
   1469 		}
   1470 		if (zp->zp_nopwrite) {
   1471 			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   1472 			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
   1473 			zio->io_pipeline |= ZIO_STAGE_NOP_WRITE;
   1474 		}
   1475 	}
   1476 	return (ZIO_PIPELINE_CONTINUE);
   1477 }
   1478 
   1479 static int
   1480 zio_free_bp_init(zio_t *zio)
   1481 {
   1482 	blkptr_t *bp = zio->io_bp;
   1483 
   1484 	if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
   1485 		if (BP_GET_DEDUP(bp))
   1486 			zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
   1487 	}
   1488 
   1489 	return (ZIO_PIPELINE_CONTINUE);
   1490 }
   1491 
   1492 /*
   1493  * ==========================================================================
   1494  * Execute the I/O pipeline
   1495  * ==========================================================================
   1496  */
   1497 
   1498 static void
   1499 zio_taskq_dispatch(zio_t *zio, zio_taskq_type_t q, boolean_t cutinline)
   1500 {
   1501 	spa_t *spa = zio->io_spa;
   1502 	zio_type_t t = zio->io_type;
   1503 	int flags = (cutinline ? TQ_FRONT : 0);
   1504 
   1505 	ASSERT(q == ZIO_TASKQ_ISSUE || q == ZIO_TASKQ_INTERRUPT);
   1506 
   1507 	/*
   1508 	 * If we're a config writer or a probe, the normal issue and
   1509 	 * interrupt threads may all be blocked waiting for the config lock.
   1510 	 * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
   1511 	 */
   1512 	if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
   1513 		t = ZIO_TYPE_NULL;
   1514 
   1515 	/*
   1516 	 * A similar issue exists for the L2ARC write thread until L2ARC 2.0.
   1517 	 */
   1518 	if (t == ZIO_TYPE_WRITE && zio->io_vd && zio->io_vd->vdev_aux)
   1519 		t = ZIO_TYPE_NULL;
   1520 
   1521 	/*
   1522 	 * If this is a high priority I/O, then use the high priority taskq if
   1523 	 * available.
   1524 	 */
   1525 	if (zio->io_priority == ZIO_PRIORITY_NOW &&
   1526 	    spa->spa_zio_taskq[t][q + 1].stqs_count != 0)
   1527 		q++;
   1528 
   1529 	ASSERT3U(q, <, ZIO_TASKQ_TYPES);
   1530 
   1531 	/*
   1532 	 * NB: We are assuming that the zio can only be dispatched
   1533 	 * to a single taskq at a time.  It would be a grievous error
   1534 	 * to dispatch the zio to another taskq at the same time.
   1535 	 */
   1536 #if defined(illumos) || !defined(_KERNEL)
   1537 	ASSERT(zio->io_tqent.tqent_next == NULL);
   1538 #elif defined(__NetBSD__)
   1539 	ASSERT(zio->io_tqent.tqent_queued == 0);
   1540 #else
   1541 	ASSERT(zio->io_tqent.tqent_task.ta_pending == 0);
   1542 #endif
   1543 	spa_taskq_dispatch_ent(spa, t, q, (task_func_t *)zio_execute, zio,
   1544 	    flags, &zio->io_tqent);
   1545 }
   1546 
   1547 static boolean_t
   1548 zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
   1549 {
   1550 	kthread_t *executor = zio->io_executor;
   1551 	spa_t *spa = zio->io_spa;
   1552 
   1553 	for (zio_type_t t = 0; t < ZIO_TYPES; t++) {
   1554 		spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
   1555 		uint_t i;
   1556 		for (i = 0; i < tqs->stqs_count; i++) {
   1557 			if (taskq_member(tqs->stqs_taskq[i], executor))
   1558 				return (B_TRUE);
   1559 		}
   1560 	}
   1561 
   1562 	return (B_FALSE);
   1563 }
   1564 
   1565 static int
   1566 zio_issue_async(zio_t *zio)
   1567 {
   1568 	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
   1569 
   1570 	return (ZIO_PIPELINE_STOP);
   1571 }
   1572 
   1573 void
   1574 zio_interrupt(zio_t *zio)
   1575 {
   1576 	zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
   1577 }
   1578 
   1579 void
   1580 zio_delay_interrupt(zio_t *zio)
   1581 {
   1582 	/*
   1583 	 * The timeout_generic() function isn't defined in userspace, so
   1584 	 * rather than trying to implement the function, the zio delay
   1585 	 * functionality has been disabled for userspace builds.
   1586 	 */
   1587 
   1588 #ifndef __NetBSD__
   1589 	/* XXXNETBSD implement timeout_generic() with a callout_t in zio_t */
   1590 	/*
   1591 	 * If io_target_timestamp is zero, then no delay has been registered
   1592 	 * for this IO, thus jump to the end of this function and "skip" the
   1593 	 * delay; issuing it directly to the zio layer.
   1594 	 */
   1595 	if (zio->io_target_timestamp != 0) {
   1596 		hrtime_t now = gethrtime();
   1597 
   1598 		if (now >= zio->io_target_timestamp) {
   1599 			/*
   1600 			 * This IO has already taken longer than the target
   1601 			 * delay to complete, so we don't want to delay it
   1602 			 * any longer; we "miss" the delay and issue it
   1603 			 * directly to the zio layer. This is likely due to
   1604 			 * the target latency being set to a value less than
   1605 			 * the underlying hardware can satisfy (e.g. delay
   1606 			 * set to 1ms, but the disks take 10ms to complete an
   1607 			 * IO request).
   1608 			 */
   1609 
   1610 			DTRACE_PROBE2(zio__delay__miss, zio_t *, zio,
   1611 			    hrtime_t, now);
   1612 
   1613 			zio_interrupt(zio);
   1614 		} else {
   1615 			hrtime_t diff = zio->io_target_timestamp - now;
   1616 
   1617 			DTRACE_PROBE3(zio__delay__hit, zio_t *, zio,
   1618 			    hrtime_t, now, hrtime_t, diff);
   1619 
   1620 			(void) timeout_generic(CALLOUT_NORMAL,
   1621 			    (void (*)(void *))zio_interrupt, zio, diff, 1, 0);
   1622 		}
   1623 
   1624 		return;
   1625 	}
   1626 #endif
   1627 
   1628 	DTRACE_PROBE1(zio__delay__skip, zio_t *, zio);
   1629 	zio_interrupt(zio);
   1630 }
   1631 
   1632 /*
   1633  * Execute the I/O pipeline until one of the following occurs:
   1634  *
   1635  *	(1) the I/O completes
   1636  *	(2) the pipeline stalls waiting for dependent child I/Os
   1637  *	(3) the I/O issues, so we're waiting for an I/O completion interrupt
   1638  *	(4) the I/O is delegated by vdev-level caching or aggregation
   1639  *	(5) the I/O is deferred due to vdev-level queueing
   1640  *	(6) the I/O is handed off to another thread.
   1641  *
   1642  * In all cases, the pipeline stops whenever there's no CPU work; it never
   1643  * burns a thread in cv_wait().
   1644  *
   1645  * There's no locking on io_stage because there's no legitimate way
   1646  * for multiple threads to be attempting to process the same I/O.
   1647  */
   1648 static zio_pipe_stage_t *zio_pipeline[];
   1649 
   1650 void
   1651 zio_execute(zio_t *zio)
   1652 {
   1653 	zio->io_executor = curthread;
   1654 
   1655 	ASSERT3U(zio->io_queued_timestamp, >, 0);
   1656 
   1657 	while (zio->io_stage < ZIO_STAGE_DONE) {
   1658 		enum zio_stage pipeline = zio->io_pipeline;
   1659 		enum zio_stage stage = zio->io_stage;
   1660 		int rv;
   1661 
   1662 		ASSERT(!MUTEX_HELD(&zio->io_lock));
   1663 		ASSERT(ISP2(stage));
   1664 		ASSERT(zio->io_stall == NULL);
   1665 
   1666 		do {
   1667 			stage <<= 1;
   1668 		} while ((stage & pipeline) == 0);
   1669 
   1670 		ASSERT(stage <= ZIO_STAGE_DONE);
   1671 
   1672 		/*
   1673 		 * If we are in interrupt context and this pipeline stage
   1674 		 * will grab a config lock that is held across I/O,
   1675 		 * or may wait for an I/O that needs an interrupt thread
   1676 		 * to complete, issue async to avoid deadlock.
   1677 		 *
   1678 		 * For VDEV_IO_START, we cut in line so that the io will
   1679 		 * be sent to disk promptly.
   1680 		 */
   1681 		if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
   1682 		    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
   1683 			boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
   1684 			    zio_requeue_io_start_cut_in_line : B_FALSE;
   1685 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
   1686 			return;
   1687 		}
   1688 
   1689 		zio->io_stage = stage;
   1690 		zio->io_pipeline_trace |= zio->io_stage;
   1691 		rv = zio_pipeline[highbit64(stage) - 1](zio);
   1692 
   1693 		if (rv == ZIO_PIPELINE_STOP)
   1694 			return;
   1695 
   1696 		ASSERT(rv == ZIO_PIPELINE_CONTINUE);
   1697 	}
   1698 }
   1699 
   1700 /*
   1701  * ==========================================================================
   1702  * Initiate I/O, either sync or async
   1703  * ==========================================================================
   1704  */
   1705 int
   1706 zio_wait(zio_t *zio)
   1707 {
   1708 	int error;
   1709 
   1710 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
   1711 	ASSERT(zio->io_executor == NULL);
   1712 
   1713 	zio->io_waiter = curthread;
   1714 	ASSERT0(zio->io_queued_timestamp);
   1715 	zio->io_queued_timestamp = gethrtime();
   1716 
   1717 	zio_execute(zio);
   1718 
   1719 	mutex_enter(&zio->io_lock);
   1720 	while (zio->io_executor != NULL)
   1721 		cv_wait(&zio->io_cv, &zio->io_lock);
   1722 	mutex_exit(&zio->io_lock);
   1723 
   1724 	error = zio->io_error;
   1725 	zio_destroy(zio);
   1726 
   1727 	return (error);
   1728 }
   1729 
   1730 void
   1731 zio_nowait(zio_t *zio)
   1732 {
   1733 	ASSERT(zio->io_executor == NULL);
   1734 
   1735 	if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
   1736 	    zio_unique_parent(zio) == NULL) {
   1737 		/*
   1738 		 * This is a logical async I/O with no parent to wait for it.
   1739 		 * We add it to the spa_async_root_zio "Godfather" I/O which
   1740 		 * will ensure they complete prior to unloading the pool.
   1741 		 */
   1742 		spa_t *spa = zio->io_spa;
   1743 
   1744 		zio_add_child(spa->spa_async_zio_root[CPU_SEQID], zio);
   1745 	}
   1746 
   1747 	ASSERT0(zio->io_queued_timestamp);
   1748 	zio->io_queued_timestamp = gethrtime();
   1749 	zio_execute(zio);
   1750 }
   1751 
   1752 /*
   1753  * ==========================================================================
   1754  * Reexecute or suspend/resume failed I/O
   1755  * ==========================================================================
   1756  */
   1757 
   1758 static void
   1759 zio_reexecute(zio_t *pio)
   1760 {
   1761 	zio_t *cio, *cio_next;
   1762 
   1763 	ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
   1764 	ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
   1765 	ASSERT(pio->io_gang_leader == NULL);
   1766 	ASSERT(pio->io_gang_tree == NULL);
   1767 
   1768 	pio->io_flags = pio->io_orig_flags;
   1769 	pio->io_stage = pio->io_orig_stage;
   1770 	pio->io_pipeline = pio->io_orig_pipeline;
   1771 	pio->io_reexecute = 0;
   1772 	pio->io_flags |= ZIO_FLAG_REEXECUTED;
   1773 	pio->io_pipeline_trace = 0;
   1774 	pio->io_error = 0;
   1775 	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
   1776 		pio->io_state[w] = 0;
   1777 	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
   1778 		pio->io_child_error[c] = 0;
   1779 
   1780 	if (IO_IS_ALLOCATING(pio))
   1781 		BP_ZERO(pio->io_bp);
   1782 
   1783 	/*
   1784 	 * As we reexecute pio's children, new children could be created.
   1785 	 * New children go to the head of pio's io_child_list, however,
   1786 	 * so we will (correctly) not reexecute them.  The key is that
   1787 	 * the remainder of pio's io_child_list, from 'cio_next' onward,
   1788 	 * cannot be affected by any side effects of reexecuting 'cio'.
   1789 	 */
   1790 	zio_link_t *zl = NULL;
   1791 	for (cio = zio_walk_children(pio, &zl); cio != NULL; cio = cio_next) {
   1792 		cio_next = zio_walk_children(pio, &zl);
   1793 		mutex_enter(&pio->io_lock);
   1794 		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
   1795 			pio->io_children[cio->io_child_type][w]++;
   1796 		mutex_exit(&pio->io_lock);
   1797 		zio_reexecute(cio);
   1798 	}
   1799 
   1800 	/*
   1801 	 * Now that all children have been reexecuted, execute the parent.
   1802 	 * We don't reexecute "The Godfather" I/O here as it's the
   1803 	 * responsibility of the caller to wait on him.
   1804 	 */
   1805 	if (!(pio->io_flags & ZIO_FLAG_GODFATHER)) {
   1806 		pio->io_queued_timestamp = gethrtime();
   1807 		zio_execute(pio);
   1808 	}
   1809 }
   1810 
   1811 void
   1812 zio_suspend(spa_t *spa, zio_t *zio)
   1813 {
   1814 	if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_PANIC)
   1815 		fm_panic("Pool '%s' has encountered an uncorrectable I/O "
   1816 		    "failure and the failure mode property for this pool "
   1817 		    "is set to panic.", spa_name(spa));
   1818 
   1819 	zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL, NULL, 0, 0);
   1820 
   1821 	mutex_enter(&spa->spa_suspend_lock);
   1822 
   1823 	if (spa->spa_suspend_zio_root == NULL)
   1824 		spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
   1825 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
   1826 		    ZIO_FLAG_GODFATHER);
   1827 
   1828 	spa->spa_suspended = B_TRUE;
   1829 
   1830 	if (zio != NULL) {
   1831 		ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
   1832 		ASSERT(zio != spa->spa_suspend_zio_root);
   1833 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   1834 		ASSERT(zio_unique_parent(zio) == NULL);
   1835 		ASSERT(zio->io_stage == ZIO_STAGE_DONE);
   1836 		zio_add_child(spa->spa_suspend_zio_root, zio);
   1837 	}
   1838 
   1839 	mutex_exit(&spa->spa_suspend_lock);
   1840 }
   1841 
   1842 int
   1843 zio_resume(spa_t *spa)
   1844 {
   1845 	zio_t *pio;
   1846 
   1847 	/*
   1848 	 * Reexecute all previously suspended i/o.
   1849 	 */
   1850 	mutex_enter(&spa->spa_suspend_lock);
   1851 	spa->spa_suspended = B_FALSE;
   1852 	cv_broadcast(&spa->spa_suspend_cv);
   1853 	pio = spa->spa_suspend_zio_root;
   1854 	spa->spa_suspend_zio_root = NULL;
   1855 	mutex_exit(&spa->spa_suspend_lock);
   1856 
   1857 	if (pio == NULL)
   1858 		return (0);
   1859 
   1860 	zio_reexecute(pio);
   1861 	return (zio_wait(pio));
   1862 }
   1863 
   1864 void
   1865 zio_resume_wait(spa_t *spa)
   1866 {
   1867 	mutex_enter(&spa->spa_suspend_lock);
   1868 	while (spa_suspended(spa))
   1869 		cv_wait(&spa->spa_suspend_cv, &spa->spa_suspend_lock);
   1870 	mutex_exit(&spa->spa_suspend_lock);
   1871 }
   1872 
   1873 /*
   1874  * ==========================================================================
   1875  * Gang blocks.
   1876  *
   1877  * A gang block is a collection of small blocks that looks to the DMU
   1878  * like one large block.  When zio_dva_allocate() cannot find a block
   1879  * of the requested size, due to either severe fragmentation or the pool
   1880  * being nearly full, it calls zio_write_gang_block() to construct the
   1881  * block from smaller fragments.
   1882  *
   1883  * A gang block consists of a gang header (zio_gbh_phys_t) and up to
   1884  * three (SPA_GBH_NBLKPTRS) gang members.  The gang header is just like
   1885  * an indirect block: it's an array of block pointers.  It consumes
   1886  * only one sector and hence is allocatable regardless of fragmentation.
   1887  * The gang header's bps point to its gang members, which hold the data.
   1888  *
   1889  * Gang blocks are self-checksumming, using the bp's <vdev, offset, txg>
   1890  * as the verifier to ensure uniqueness of the SHA256 checksum.
   1891  * Critically, the gang block bp's blk_cksum is the checksum of the data,
   1892  * not the gang header.  This ensures that data block signatures (needed for
   1893  * deduplication) are independent of how the block is physically stored.
   1894  *
   1895  * Gang blocks can be nested: a gang member may itself be a gang block.
   1896  * Thus every gang block is a tree in which root and all interior nodes are
   1897  * gang headers, and the leaves are normal blocks that contain user data.
   1898  * The root of the gang tree is called the gang leader.
   1899  *
   1900  * To perform any operation (read, rewrite, free, claim) on a gang block,
   1901  * zio_gang_assemble() first assembles the gang tree (minus data leaves)
   1902  * in the io_gang_tree field of the original logical i/o by recursively
   1903  * reading the gang leader and all gang headers below it.  This yields
   1904  * an in-core tree containing the contents of every gang header and the
   1905  * bps for every constituent of the gang block.
   1906  *
   1907  * With the gang tree now assembled, zio_gang_issue() just walks the gang tree
   1908  * and invokes a callback on each bp.  To free a gang block, zio_gang_issue()
   1909  * calls zio_free_gang() -- a trivial wrapper around zio_free() -- for each bp.
   1910  * zio_claim_gang() provides a similarly trivial wrapper for zio_claim().
   1911  * zio_read_gang() is a wrapper around zio_read() that omits reading gang
   1912  * headers, since we already have those in io_gang_tree.  zio_rewrite_gang()
   1913  * performs a zio_rewrite() of the data or, for gang headers, a zio_rewrite()
   1914  * of the gang header plus zio_checksum_compute() of the data to update the
   1915  * gang header's blk_cksum as described above.
   1916  *
   1917  * The two-phase assemble/issue model solves the problem of partial failure --
   1918  * what if you'd freed part of a gang block but then couldn't read the
   1919  * gang header for another part?  Assembling the entire gang tree first
   1920  * ensures that all the necessary gang header I/O has succeeded before
   1921  * starting the actual work of free, claim, or write.  Once the gang tree
   1922  * is assembled, free and claim are in-memory operations that cannot fail.
   1923  *
   1924  * In the event that a gang write fails, zio_dva_unallocate() walks the
   1925  * gang tree to immediately free (i.e. insert back into the space map)
   1926  * everything we've allocated.  This ensures that we don't get ENOSPC
   1927  * errors during repeated suspend/resume cycles due to a flaky device.
   1928  *
   1929  * Gang rewrites only happen during sync-to-convergence.  If we can't assemble
   1930  * the gang tree, we won't modify the block, so we can safely defer the free
   1931  * (knowing that the block is still intact).  If we *can* assemble the gang
   1932  * tree, then even if some of the rewrites fail, zio_dva_unallocate() will free
   1933  * each constituent bp and we can allocate a new block on the next sync pass.
   1934  *
   1935  * In all cases, the gang tree allows complete recovery from partial failure.
   1936  * ==========================================================================
   1937  */
   1938 
   1939 static zio_t *
   1940 zio_read_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
   1941 {
   1942 	if (gn != NULL)
   1943 		return (pio);
   1944 
   1945 	return (zio_read(pio, pio->io_spa, bp, data, BP_GET_PSIZE(bp),
   1946 	    NULL, NULL, pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
   1947 	    &pio->io_bookmark));
   1948 }
   1949 
   1950 zio_t *
   1951 zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
   1952 {
   1953 	zio_t *zio;
   1954 
   1955 	if (gn != NULL) {
   1956 		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
   1957 		    gn->gn_gbh, SPA_GANGBLOCKSIZE, NULL, NULL, pio->io_priority,
   1958 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
   1959 		/*
   1960 		 * As we rewrite each gang header, the pipeline will compute
   1961 		 * a new gang block header checksum for it; but no one will
   1962 		 * compute a new data checksum, so we do that here.  The one
   1963 		 * exception is the gang leader: the pipeline already computed
   1964 		 * its data checksum because that stage precedes gang assembly.
   1965 		 * (Presently, nothing actually uses interior data checksums;
   1966 		 * this is just good hygiene.)
   1967 		 */
   1968 		if (gn != pio->io_gang_leader->io_gang_tree) {
   1969 			zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
   1970 			    data, BP_GET_PSIZE(bp));
   1971 		}
   1972 		/*
   1973 		 * If we are here to damage data for testing purposes,
   1974 		 * leave the GBH alone so that we can detect the damage.
   1975 		 */
   1976 		if (pio->io_gang_leader->io_flags & ZIO_FLAG_INDUCE_DAMAGE)
   1977 			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
   1978 	} else {
   1979 		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
   1980 		    data, BP_GET_PSIZE(bp), NULL, NULL, pio->io_priority,
   1981 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
   1982 	}
   1983 
   1984 	return (zio);
   1985 }
   1986 
   1987 /* ARGSUSED */
   1988 zio_t *
   1989 zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
   1990 {
   1991 	return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
   1992 	    BP_IS_GANG(bp) ? SPA_GANGBLOCKSIZE : BP_GET_PSIZE(bp),
   1993 	    ZIO_GANG_CHILD_FLAGS(pio)));
   1994 }
   1995 
   1996 /* ARGSUSED */
   1997 zio_t *
   1998 zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
   1999 {
   2000 	return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
   2001 	    NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
   2002 }
   2003 
   2004 static zio_gang_issue_func_t *zio_gang_issue_func[ZIO_TYPES] = {
   2005 	NULL,
   2006 	zio_read_gang,
   2007 	zio_rewrite_gang,
   2008 	zio_free_gang,
   2009 	zio_claim_gang,
   2010 	NULL
   2011 };
   2012 
   2013 static void zio_gang_tree_assemble_done(zio_t *zio);
   2014 
   2015 static zio_gang_node_t *
   2016 zio_gang_node_alloc(zio_gang_node_t **gnpp)
   2017 {
   2018 	zio_gang_node_t *gn;
   2019 
   2020 	ASSERT(*gnpp == NULL);
   2021 
   2022 	gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
   2023 	gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
   2024 	*gnpp = gn;
   2025 
   2026 	return (gn);
   2027 }
   2028 
   2029 static void
   2030 zio_gang_node_free(zio_gang_node_t **gnpp)
   2031 {
   2032 	zio_gang_node_t *gn = *gnpp;
   2033 
   2034 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
   2035 		ASSERT(gn->gn_child[g] == NULL);
   2036 
   2037 	zio_buf_free(gn->gn_gbh, SPA_GANGBLOCKSIZE);
   2038 	kmem_free(gn, sizeof (*gn));
   2039 	*gnpp = NULL;
   2040 }
   2041 
   2042 static void
   2043 zio_gang_tree_free(zio_gang_node_t **gnpp)
   2044 {
   2045 	zio_gang_node_t *gn = *gnpp;
   2046 
   2047 	if (gn == NULL)
   2048 		return;
   2049 
   2050 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
   2051 		zio_gang_tree_free(&gn->gn_child[g]);
   2052 
   2053 	zio_gang_node_free(gnpp);
   2054 }
   2055 
   2056 static void
   2057 zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
   2058 {
   2059 	zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
   2060 
   2061 	ASSERT(gio->io_gang_leader == gio);
   2062 	ASSERT(BP_IS_GANG(bp));
   2063 
   2064 	zio_nowait(zio_read(gio, gio->io_spa, bp, gn->gn_gbh,
   2065 	    SPA_GANGBLOCKSIZE, zio_gang_tree_assemble_done, gn,
   2066 	    gio->io_priority, ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
   2067 }
   2068 
   2069 static void
   2070 zio_gang_tree_assemble_done(zio_t *zio)
   2071 {
   2072 	zio_t *gio = zio->io_gang_leader;
   2073 	zio_gang_node_t *gn = zio->io_private;
   2074 	blkptr_t *bp = zio->io_bp;
   2075 
   2076 	ASSERT(gio == zio_unique_parent(zio));
   2077 	ASSERT(zio->io_child_count == 0);
   2078 
   2079 	if (zio->io_error)
   2080 		return;
   2081 
   2082 	if (BP_SHOULD_BYTESWAP(bp))
   2083 		byteswap_uint64_array(zio->io_data, zio->io_size);
   2084 
   2085 	ASSERT(zio->io_data == gn->gn_gbh);
   2086 	ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
   2087 	ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
   2088 
   2089 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
   2090 		blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
   2091 		if (!BP_IS_GANG(gbp))
   2092 			continue;
   2093 		zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
   2094 	}
   2095 }
   2096 
   2097 static void
   2098 zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, void *data)
   2099 {
   2100 	zio_t *gio = pio->io_gang_leader;
   2101 	zio_t *zio;
   2102 
   2103 	ASSERT(BP_IS_GANG(bp) == !!gn);
   2104 	ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
   2105 	ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
   2106 
   2107 	/*
   2108 	 * If you're a gang header, your data is in gn->gn_gbh.
   2109 	 * If you're a gang member, your data is in 'data' and gn == NULL.
   2110 	 */
   2111 	zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data);
   2112 
   2113 	if (gn != NULL) {
   2114 		ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
   2115 
   2116 		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
   2117 			blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
   2118 			if (BP_IS_HOLE(gbp))
   2119 				continue;
   2120 			zio_gang_tree_issue(zio, gn->gn_child[g], gbp, data);
   2121 			data = (char *)data + BP_GET_PSIZE(gbp);
   2122 		}
   2123 	}
   2124 
   2125 	if (gn == gio->io_gang_tree && gio->io_data != NULL)
   2126 		ASSERT3P((char *)gio->io_data + gio->io_size, ==, data);
   2127 
   2128 	if (zio != pio)
   2129 		zio_nowait(zio);
   2130 }
   2131 
   2132 static int
   2133 zio_gang_assemble(zio_t *zio)
   2134 {
   2135 	blkptr_t *bp = zio->io_bp;
   2136 
   2137 	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
   2138 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
   2139 
   2140 	zio->io_gang_leader = zio;
   2141 
   2142 	zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
   2143 
   2144 	return (ZIO_PIPELINE_CONTINUE);
   2145 }
   2146 
   2147 static int
   2148 zio_gang_issue(zio_t *zio)
   2149 {
   2150 	blkptr_t *bp = zio->io_bp;
   2151 
   2152 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
   2153 		return (ZIO_PIPELINE_STOP);
   2154 
   2155 	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
   2156 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
   2157 
   2158 	if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
   2159 		zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_data);
   2160 	else
   2161 		zio_gang_tree_free(&zio->io_gang_tree);
   2162 
   2163 	zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   2164 
   2165 	return (ZIO_PIPELINE_CONTINUE);
   2166 }
   2167 
   2168 static void
   2169 zio_write_gang_member_ready(zio_t *zio)
   2170 {
   2171 	zio_t *pio = zio_unique_parent(zio);
   2172 	zio_t *gio = zio->io_gang_leader;
   2173 	dva_t *cdva = zio->io_bp->blk_dva;
   2174 	dva_t *pdva = pio->io_bp->blk_dva;
   2175 	uint64_t asize;
   2176 
   2177 	if (BP_IS_HOLE(zio->io_bp))
   2178 		return;
   2179 
   2180 	ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
   2181 
   2182 	ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
   2183 	ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
   2184 	ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
   2185 	ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
   2186 	ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
   2187 
   2188 	mutex_enter(&pio->io_lock);
   2189 	for (int d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
   2190 		ASSERT(DVA_GET_GANG(&pdva[d]));
   2191 		asize = DVA_GET_ASIZE(&pdva[d]);
   2192 		asize += DVA_GET_ASIZE(&cdva[d]);
   2193 		DVA_SET_ASIZE(&pdva[d], asize);
   2194 	}
   2195 	mutex_exit(&pio->io_lock);
   2196 }
   2197 
   2198 static int
   2199 zio_write_gang_block(zio_t *pio)
   2200 {
   2201 	spa_t *spa = pio->io_spa;
   2202 	metaslab_class_t *mc = spa_normal_class(spa);
   2203 	blkptr_t *bp = pio->io_bp;
   2204 	zio_t *gio = pio->io_gang_leader;
   2205 	zio_t *zio;
   2206 	zio_gang_node_t *gn, **gnpp;
   2207 	zio_gbh_phys_t *gbh;
   2208 	uint64_t txg = pio->io_txg;
   2209 	uint64_t resid = pio->io_size;
   2210 	uint64_t lsize;
   2211 	int copies = gio->io_prop.zp_copies;
   2212 	int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
   2213 	zio_prop_t zp;
   2214 	int error;
   2215 
   2216 	int flags = METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER;
   2217 	if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
   2218 		ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
   2219 		ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
   2220 
   2221 		flags |= METASLAB_ASYNC_ALLOC;
   2222 		VERIFY(refcount_held(&mc->mc_alloc_slots, pio));
   2223 
   2224 		/*
   2225 		 * The logical zio has already placed a reservation for
   2226 		 * 'copies' allocation slots but gang blocks may require
   2227 		 * additional copies. These additional copies
   2228 		 * (i.e. gbh_copies - copies) are guaranteed to succeed
   2229 		 * since metaslab_class_throttle_reserve() always allows
   2230 		 * additional reservations for gang blocks.
   2231 		 */
   2232 		VERIFY(metaslab_class_throttle_reserve(mc, gbh_copies - copies,
   2233 		    pio, flags));
   2234 	}
   2235 
   2236 	error = metaslab_alloc(spa, mc, SPA_GANGBLOCKSIZE,
   2237 	    bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp, flags,
   2238 	    &pio->io_alloc_list, pio);
   2239 	if (error) {
   2240 		if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
   2241 			ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
   2242 			ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
   2243 
   2244 			/*
   2245 			 * If we failed to allocate the gang block header then
   2246 			 * we remove any additional allocation reservations that
   2247 			 * we placed here. The original reservation will
   2248 			 * be removed when the logical I/O goes to the ready
   2249 			 * stage.
   2250 			 */
   2251 			metaslab_class_throttle_unreserve(mc,
   2252 			    gbh_copies - copies, pio);
   2253 		}
   2254 		pio->io_error = error;
   2255 		return (ZIO_PIPELINE_CONTINUE);
   2256 	}
   2257 
   2258 	if (pio == gio) {
   2259 		gnpp = &gio->io_gang_tree;
   2260 	} else {
   2261 		gnpp = pio->io_private;
   2262 		ASSERT(pio->io_ready == zio_write_gang_member_ready);
   2263 	}
   2264 
   2265 	gn = zio_gang_node_alloc(gnpp);
   2266 	gbh = gn->gn_gbh;
   2267 	bzero(gbh, SPA_GANGBLOCKSIZE);
   2268 
   2269 	/*
   2270 	 * Create the gang header.
   2271 	 */
   2272 	zio = zio_rewrite(pio, spa, txg, bp, gbh, SPA_GANGBLOCKSIZE, NULL, NULL,
   2273 	    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
   2274 
   2275 	/*
   2276 	 * Create and nowait the gang children.
   2277 	 */
   2278 	for (int g = 0; resid != 0; resid -= lsize, g++) {
   2279 		lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
   2280 		    SPA_MINBLOCKSIZE);
   2281 		ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
   2282 
   2283 		zp.zp_checksum = gio->io_prop.zp_checksum;
   2284 		zp.zp_compress = ZIO_COMPRESS_OFF;
   2285 		zp.zp_type = DMU_OT_NONE;
   2286 		zp.zp_level = 0;
   2287 		zp.zp_copies = gio->io_prop.zp_copies;
   2288 		zp.zp_dedup = B_FALSE;
   2289 		zp.zp_dedup_verify = B_FALSE;
   2290 		zp.zp_nopwrite = B_FALSE;
   2291 
   2292 		zio_t *cio = zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
   2293 		    (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
   2294 		    zio_write_gang_member_ready, NULL, NULL, NULL,
   2295 		    &gn->gn_child[g], pio->io_priority,
   2296 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
   2297 
   2298 		if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
   2299 			ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
   2300 			ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
   2301 
   2302 			/*
   2303 			 * Gang children won't throttle but we should
   2304 			 * account for their work, so reserve an allocation
   2305 			 * slot for them here.
   2306 			 */
   2307 			VERIFY(metaslab_class_throttle_reserve(mc,
   2308 			    zp.zp_copies, cio, flags));
   2309 		}
   2310 		zio_nowait(cio);
   2311 	}
   2312 
   2313 	/*
   2314 	 * Set pio's pipeline to just wait for zio to finish.
   2315 	 */
   2316 	pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   2317 
   2318 	zio_nowait(zio);
   2319 
   2320 	return (ZIO_PIPELINE_CONTINUE);
   2321 }
   2322 
   2323 /*
   2324  * The zio_nop_write stage in the pipeline determines if allocating a
   2325  * new bp is necessary.  The nopwrite feature can handle writes in
   2326  * either syncing or open context (i.e. zil writes) and as a result is
   2327  * mutually exclusive with dedup.
   2328  *
   2329  * By leveraging a cryptographically secure checksum, such as SHA256, we
   2330  * can compare the checksums of the new data and the old to determine if
   2331  * allocating a new block is required.  Note that our requirements for
   2332  * cryptographic strength are fairly weak: there can't be any accidental
   2333  * hash collisions, but we don't need to be secure against intentional
   2334  * (malicious) collisions.  To trigger a nopwrite, you have to be able
   2335  * to write the file to begin with, and triggering an incorrect (hash
   2336  * collision) nopwrite is no worse than simply writing to the file.
   2337  * That said, there are no known attacks against the checksum algorithms
   2338  * used for nopwrite, assuming that the salt and the checksums
   2339  * themselves remain secret.
   2340  */
   2341 static int
   2342 zio_nop_write(zio_t *zio)
   2343 {
   2344 	blkptr_t *bp = zio->io_bp;
   2345 	blkptr_t *bp_orig = &zio->io_bp_orig;
   2346 	zio_prop_t *zp = &zio->io_prop;
   2347 
   2348 	ASSERT(BP_GET_LEVEL(bp) == 0);
   2349 	ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
   2350 	ASSERT(zp->zp_nopwrite);
   2351 	ASSERT(!zp->zp_dedup);
   2352 	ASSERT(zio->io_bp_override == NULL);
   2353 	ASSERT(IO_IS_ALLOCATING(zio));
   2354 
   2355 	/*
   2356 	 * Check to see if the original bp and the new bp have matching
   2357 	 * characteristics (i.e. same checksum, compression algorithms, etc).
   2358 	 * If they don't then just continue with the pipeline which will
   2359 	 * allocate a new bp.
   2360 	 */
   2361 	if (BP_IS_HOLE(bp_orig) ||
   2362 	    !(zio_checksum_table[BP_GET_CHECKSUM(bp)].ci_flags &
   2363 	    ZCHECKSUM_FLAG_NOPWRITE) ||
   2364 	    BP_GET_CHECKSUM(bp) != BP_GET_CHECKSUM(bp_orig) ||
   2365 	    BP_GET_COMPRESS(bp) != BP_GET_COMPRESS(bp_orig) ||
   2366 	    BP_GET_DEDUP(bp) != BP_GET_DEDUP(bp_orig) ||
   2367 	    zp->zp_copies != BP_GET_NDVAS(bp_orig))
   2368 		return (ZIO_PIPELINE_CONTINUE);
   2369 
   2370 	/*
   2371 	 * If the checksums match then reset the pipeline so that we
   2372 	 * avoid allocating a new bp and issuing any I/O.
   2373 	 */
   2374 	if (ZIO_CHECKSUM_EQUAL(bp->blk_cksum, bp_orig->blk_cksum)) {
   2375 		ASSERT(zio_checksum_table[zp->zp_checksum].ci_flags &
   2376 		    ZCHECKSUM_FLAG_NOPWRITE);
   2377 		ASSERT3U(BP_GET_PSIZE(bp), ==, BP_GET_PSIZE(bp_orig));
   2378 		ASSERT3U(BP_GET_LSIZE(bp), ==, BP_GET_LSIZE(bp_orig));
   2379 		ASSERT(zp->zp_compress != ZIO_COMPRESS_OFF);
   2380 		ASSERT(bcmp(&bp->blk_prop, &bp_orig->blk_prop,
   2381 		    sizeof (uint64_t)) == 0);
   2382 
   2383 		*bp = *bp_orig;
   2384 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   2385 		zio->io_flags |= ZIO_FLAG_NOPWRITE;
   2386 	}
   2387 
   2388 	return (ZIO_PIPELINE_CONTINUE);
   2389 }
   2390 
   2391 /*
   2392  * ==========================================================================
   2393  * Dedup
   2394  * ==========================================================================
   2395  */
   2396 static void
   2397 zio_ddt_child_read_done(zio_t *zio)
   2398 {
   2399 	blkptr_t *bp = zio->io_bp;
   2400 	ddt_entry_t *dde = zio->io_private;
   2401 	ddt_phys_t *ddp;
   2402 	zio_t *pio = zio_unique_parent(zio);
   2403 
   2404 	mutex_enter(&pio->io_lock);
   2405 	ddp = ddt_phys_select(dde, bp);
   2406 	if (zio->io_error == 0)
   2407 		ddt_phys_clear(ddp);	/* this ddp doesn't need repair */
   2408 	if (zio->io_error == 0 && dde->dde_repair_data == NULL)
   2409 		dde->dde_repair_data = zio->io_data;
   2410 	else
   2411 		zio_buf_free(zio->io_data, zio->io_size);
   2412 	mutex_exit(&pio->io_lock);
   2413 }
   2414 
   2415 static int
   2416 zio_ddt_read_start(zio_t *zio)
   2417 {
   2418 	blkptr_t *bp = zio->io_bp;
   2419 
   2420 	ASSERT(BP_GET_DEDUP(bp));
   2421 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
   2422 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   2423 
   2424 	if (zio->io_child_error[ZIO_CHILD_DDT]) {
   2425 		ddt_t *ddt = ddt_select(zio->io_spa, bp);
   2426 		ddt_entry_t *dde = ddt_repair_start(ddt, bp);
   2427 		ddt_phys_t *ddp = dde->dde_phys;
   2428 		ddt_phys_t *ddp_self = ddt_phys_select(dde, bp);
   2429 		blkptr_t blk;
   2430 
   2431 		ASSERT(zio->io_vsd == NULL);
   2432 		zio->io_vsd = dde;
   2433 
   2434 		if (ddp_self == NULL)
   2435 			return (ZIO_PIPELINE_CONTINUE);
   2436 
   2437 		for (int p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
   2438 			if (ddp->ddp_phys_birth == 0 || ddp == ddp_self)
   2439 				continue;
   2440 			ddt_bp_create(ddt->ddt_checksum, &dde->dde_key, ddp,
   2441 			    &blk);
   2442 			zio_nowait(zio_read(zio, zio->io_spa, &blk,
   2443 			    zio_buf_alloc(zio->io_size), zio->io_size,
   2444 			    zio_ddt_child_read_done, dde, zio->io_priority,
   2445 			    ZIO_DDT_CHILD_FLAGS(zio) | ZIO_FLAG_DONT_PROPAGATE,
   2446 			    &zio->io_bookmark));
   2447 		}
   2448 		return (ZIO_PIPELINE_CONTINUE);
   2449 	}
   2450 
   2451 	zio_nowait(zio_read(zio, zio->io_spa, bp,
   2452 	    zio->io_data, zio->io_size, NULL, NULL, zio->io_priority,
   2453 	    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark));
   2454 
   2455 	return (ZIO_PIPELINE_CONTINUE);
   2456 }
   2457 
   2458 static int
   2459 zio_ddt_read_done(zio_t *zio)
   2460 {
   2461 	blkptr_t *bp = zio->io_bp;
   2462 
   2463 	if (zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE))
   2464 		return (ZIO_PIPELINE_STOP);
   2465 
   2466 	ASSERT(BP_GET_DEDUP(bp));
   2467 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
   2468 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   2469 
   2470 	if (zio->io_child_error[ZIO_CHILD_DDT]) {
   2471 		ddt_t *ddt = ddt_select(zio->io_spa, bp);
   2472 		ddt_entry_t *dde = zio->io_vsd;
   2473 		if (ddt == NULL) {
   2474 			ASSERT(spa_load_state(zio->io_spa) != SPA_LOAD_NONE);
   2475 			return (ZIO_PIPELINE_CONTINUE);
   2476 		}
   2477 		if (dde == NULL) {
   2478 			zio->io_stage = ZIO_STAGE_DDT_READ_START >> 1;
   2479 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
   2480 			return (ZIO_PIPELINE_STOP);
   2481 		}
   2482 		if (dde->dde_repair_data != NULL) {
   2483 			bcopy(dde->dde_repair_data, zio->io_data, zio->io_size);
   2484 			zio->io_child_error[ZIO_CHILD_DDT] = 0;
   2485 		}
   2486 		ddt_repair_done(ddt, dde);
   2487 		zio->io_vsd = NULL;
   2488 	}
   2489 
   2490 	ASSERT(zio->io_vsd == NULL);
   2491 
   2492 	return (ZIO_PIPELINE_CONTINUE);
   2493 }
   2494 
   2495 static boolean_t
   2496 zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
   2497 {
   2498 	spa_t *spa = zio->io_spa;
   2499 
   2500 	/*
   2501 	 * Note: we compare the original data, not the transformed data,
   2502 	 * because when zio->io_bp is an override bp, we will not have
   2503 	 * pushed the I/O transforms.  That's an important optimization
   2504 	 * because otherwise we'd compress/encrypt all dmu_sync() data twice.
   2505 	 */
   2506 	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
   2507 		zio_t *lio = dde->dde_lead_zio[p];
   2508 
   2509 		if (lio != NULL) {
   2510 			return (lio->io_orig_size != zio->io_orig_size ||
   2511 			    bcmp(zio->io_orig_data, lio->io_orig_data,
   2512 			    zio->io_orig_size) != 0);
   2513 		}
   2514 	}
   2515 
   2516 	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
   2517 		ddt_phys_t *ddp = &dde->dde_phys[p];
   2518 
   2519 		if (ddp->ddp_phys_birth != 0) {
   2520 			arc_buf_t *abuf = NULL;
   2521 			arc_flags_t aflags = ARC_FLAG_WAIT;
   2522 			blkptr_t blk = *zio->io_bp;
   2523 			int error;
   2524 
   2525 			ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
   2526 
   2527 			ddt_exit(ddt);
   2528 
   2529 			error = arc_read(NULL, spa, &blk,
   2530 			    arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
   2531 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
   2532 			    &aflags, &zio->io_bookmark);
   2533 
   2534 			if (error == 0) {
   2535 				if (arc_buf_size(abuf) != zio->io_orig_size ||
   2536 				    bcmp(abuf->b_data, zio->io_orig_data,
   2537 				    zio->io_orig_size) != 0)
   2538 					error = SET_ERROR(EEXIST);
   2539 				arc_buf_destroy(abuf, &abuf);
   2540 			}
   2541 
   2542 			ddt_enter(ddt);
   2543 			return (error != 0);
   2544 		}
   2545 	}
   2546 
   2547 	return (B_FALSE);
   2548 }
   2549 
   2550 static void
   2551 zio_ddt_child_write_ready(zio_t *zio)
   2552 {
   2553 	int p = zio->io_prop.zp_copies;
   2554 	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
   2555 	ddt_entry_t *dde = zio->io_private;
   2556 	ddt_phys_t *ddp = &dde->dde_phys[p];
   2557 	zio_t *pio;
   2558 
   2559 	if (zio->io_error)
   2560 		return;
   2561 
   2562 	ddt_enter(ddt);
   2563 
   2564 	ASSERT(dde->dde_lead_zio[p] == zio);
   2565 
   2566 	ddt_phys_fill(ddp, zio->io_bp);
   2567 
   2568 	zio_link_t *zl = NULL;
   2569 	while ((pio = zio_walk_parents(zio, &zl)) != NULL)
   2570 		ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
   2571 
   2572 	ddt_exit(ddt);
   2573 }
   2574 
   2575 static void
   2576 zio_ddt_child_write_done(zio_t *zio)
   2577 {
   2578 	int p = zio->io_prop.zp_copies;
   2579 	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
   2580 	ddt_entry_t *dde = zio->io_private;
   2581 	ddt_phys_t *ddp = &dde->dde_phys[p];
   2582 
   2583 	ddt_enter(ddt);
   2584 
   2585 	ASSERT(ddp->ddp_refcnt == 0);
   2586 	ASSERT(dde->dde_lead_zio[p] == zio);
   2587 	dde->dde_lead_zio[p] = NULL;
   2588 
   2589 	if (zio->io_error == 0) {
   2590 		zio_link_t *zl = NULL;
   2591 		while (zio_walk_parents(zio, &zl) != NULL)
   2592 			ddt_phys_addref(ddp);
   2593 	} else {
   2594 		ddt_phys_clear(ddp);
   2595 	}
   2596 
   2597 	ddt_exit(ddt);
   2598 }
   2599 
   2600 static void
   2601 zio_ddt_ditto_write_done(zio_t *zio)
   2602 {
   2603 	int p = DDT_PHYS_DITTO;
   2604 	zio_prop_t *zp = &zio->io_prop;
   2605 	blkptr_t *bp = zio->io_bp;
   2606 	ddt_t *ddt = ddt_select(zio->io_spa, bp);
   2607 	ddt_entry_t *dde = zio->io_private;
   2608 	ddt_phys_t *ddp = &dde->dde_phys[p];
   2609 	ddt_key_t *ddk = &dde->dde_key;
   2610 
   2611 	ddt_enter(ddt);
   2612 
   2613 	ASSERT(ddp->ddp_refcnt == 0);
   2614 	ASSERT(dde->dde_lead_zio[p] == zio);
   2615 	dde->dde_lead_zio[p] = NULL;
   2616 
   2617 	if (zio->io_error == 0) {
   2618 		ASSERT(ZIO_CHECKSUM_EQUAL(bp->blk_cksum, ddk->ddk_cksum));
   2619 		ASSERT(zp->zp_copies < SPA_DVAS_PER_BP);
   2620 		ASSERT(zp->zp_copies == BP_GET_NDVAS(bp) - BP_IS_GANG(bp));
   2621 		if (ddp->ddp_phys_birth != 0)
   2622 			ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
   2623 		ddt_phys_fill(ddp, bp);
   2624 	}
   2625 
   2626 	ddt_exit(ddt);
   2627 }
   2628 
   2629 static int
   2630 zio_ddt_write(zio_t *zio)
   2631 {
   2632 	spa_t *spa = zio->io_spa;
   2633 	blkptr_t *bp = zio->io_bp;
   2634 	uint64_t txg = zio->io_txg;
   2635 	zio_prop_t *zp = &zio->io_prop;
   2636 	int p = zp->zp_copies;
   2637 	int ditto_copies;
   2638 	zio_t *cio = NULL;
   2639 	zio_t *dio = NULL;
   2640 	ddt_t *ddt = ddt_select(spa, bp);
   2641 	ddt_entry_t *dde;
   2642 	ddt_phys_t *ddp;
   2643 
   2644 	ASSERT(BP_GET_DEDUP(bp));
   2645 	ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
   2646 	ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
   2647 
   2648 	ddt_enter(ddt);
   2649 	dde = ddt_lookup(ddt, bp, B_TRUE);
   2650 	ddp = &dde->dde_phys[p];
   2651 
   2652 	if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
   2653 		/*
   2654 		 * If we're using a weak checksum, upgrade to a strong checksum
   2655 		 * and try again.  If we're already using a strong checksum,
   2656 		 * we can't resolve it, so just convert to an ordinary write.
   2657 		 * (And automatically e-mail a paper to Nature?)
   2658 		 */
   2659 		if (!(zio_checksum_table[zp->zp_checksum].ci_flags &
   2660 		    ZCHECKSUM_FLAG_DEDUP)) {
   2661 			zp->zp_checksum = spa_dedup_checksum(spa);
   2662 			zio_pop_transforms(zio);
   2663 			zio->io_stage = ZIO_STAGE_OPEN;
   2664 			BP_ZERO(bp);
   2665 		} else {
   2666 			zp->zp_dedup = B_FALSE;
   2667 		}
   2668 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
   2669 		ddt_exit(ddt);
   2670 		return (ZIO_PIPELINE_CONTINUE);
   2671 	}
   2672 
   2673 	ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
   2674 	ASSERT(ditto_copies < SPA_DVAS_PER_BP);
   2675 
   2676 	if (ditto_copies > ddt_ditto_copies_present(dde) &&
   2677 	    dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {
   2678 		zio_prop_t czp = *zp;
   2679 
   2680 		czp.zp_copies = ditto_copies;
   2681 
   2682 		/*
   2683 		 * If we arrived here with an override bp, we won't have run
   2684 		 * the transform stack, so we won't have the data we need to
   2685 		 * generate a child i/o.  So, toss the override bp and restart.
   2686 		 * This is safe, because using the override bp is just an
   2687 		 * optimization; and it's rare, so the cost doesn't matter.
   2688 		 */
   2689 		if (zio->io_bp_override) {
   2690 			zio_pop_transforms(zio);
   2691 			zio->io_stage = ZIO_STAGE_OPEN;
   2692 			zio->io_pipeline = ZIO_WRITE_PIPELINE;
   2693 			zio->io_bp_override = NULL;
   2694 			BP_ZERO(bp);
   2695 			ddt_exit(ddt);
   2696 			return (ZIO_PIPELINE_CONTINUE);
   2697 		}
   2698 
   2699 		dio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
   2700 		    zio->io_orig_size, &czp, NULL, NULL,
   2701 		    NULL, zio_ddt_ditto_write_done, dde, zio->io_priority,
   2702 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
   2703 
   2704 		zio_push_transform(dio, zio->io_data, zio->io_size, 0, NULL);
   2705 		dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
   2706 	}
   2707 
   2708 	if (ddp->ddp_phys_birth != 0 || dde->dde_lead_zio[p] != NULL) {
   2709 		if (ddp->ddp_phys_birth != 0)
   2710 			ddt_bp_fill(ddp, bp, txg);
   2711 		if (dde->dde_lead_zio[p] != NULL)
   2712 			zio_add_child(zio, dde->dde_lead_zio[p]);
   2713 		else
   2714 			ddt_phys_addref(ddp);
   2715 	} else if (zio->io_bp_override) {
   2716 		ASSERT(bp->blk_birth == txg);
   2717 		ASSERT(BP_EQUAL(bp, zio->io_bp_override));
   2718 		ddt_phys_fill(ddp, bp);
   2719 		ddt_phys_addref(ddp);
   2720 	} else {
   2721 		cio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
   2722 		    zio->io_orig_size, zp,
   2723 		    zio_ddt_child_write_ready, NULL, NULL,
   2724 		    zio_ddt_child_write_done, dde, zio->io_priority,
   2725 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
   2726 
   2727 		zio_push_transform(cio, zio->io_data, zio->io_size, 0, NULL);
   2728 		dde->dde_lead_zio[p] = cio;
   2729 	}
   2730 
   2731 	ddt_exit(ddt);
   2732 
   2733 	if (cio)
   2734 		zio_nowait(cio);
   2735 	if (dio)
   2736 		zio_nowait(dio);
   2737 
   2738 	return (ZIO_PIPELINE_CONTINUE);
   2739 }
   2740 
   2741 ddt_entry_t *freedde; /* for debugging */
   2742 
   2743 static int
   2744 zio_ddt_free(zio_t *zio)
   2745 {
   2746 	spa_t *spa = zio->io_spa;
   2747 	blkptr_t *bp = zio->io_bp;
   2748 	ddt_t *ddt = ddt_select(spa, bp);
   2749 	ddt_entry_t *dde;
   2750 	ddt_phys_t *ddp;
   2751 
   2752 	ASSERT(BP_GET_DEDUP(bp));
   2753 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   2754 
   2755 	ddt_enter(ddt);
   2756 	freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
   2757 	ddp = ddt_phys_select(dde, bp);
   2758 	ddt_phys_decref(ddp);
   2759 	ddt_exit(ddt);
   2760 
   2761 	return (ZIO_PIPELINE_CONTINUE);
   2762 }
   2763 
   2764 /*
   2765  * ==========================================================================
   2766  * Allocate and free blocks
   2767  * ==========================================================================
   2768  */
   2769 
   2770 static zio_t *
   2771 zio_io_to_allocate(spa_t *spa)
   2772 {
   2773 	zio_t *zio;
   2774 
   2775 	ASSERT(MUTEX_HELD(&spa->spa_alloc_lock));
   2776 
   2777 	zio = avl_first(&spa->spa_alloc_tree);
   2778 	if (zio == NULL)
   2779 		return (NULL);
   2780 
   2781 	ASSERT(IO_IS_ALLOCATING(zio));
   2782 
   2783 	/*
   2784 	 * Try to place a reservation for this zio. If we're unable to
   2785 	 * reserve then we throttle.
   2786 	 */
   2787 	if (!metaslab_class_throttle_reserve(spa_normal_class(spa),
   2788 	    zio->io_prop.zp_copies, zio, 0)) {
   2789 		return (NULL);
   2790 	}
   2791 
   2792 	avl_remove(&spa->spa_alloc_tree, zio);
   2793 	ASSERT3U(zio->io_stage, <, ZIO_STAGE_DVA_ALLOCATE);
   2794 
   2795 	return (zio);
   2796 }
   2797 
   2798 static int
   2799 zio_dva_throttle(zio_t *zio)
   2800 {
   2801 	spa_t *spa = zio->io_spa;
   2802 	zio_t *nio;
   2803 
   2804 	if (zio->io_priority == ZIO_PRIORITY_SYNC_WRITE ||
   2805 	    !spa_normal_class(zio->io_spa)->mc_alloc_throttle_enabled ||
   2806 	    zio->io_child_type == ZIO_CHILD_GANG ||
   2807 	    zio->io_flags & ZIO_FLAG_NODATA) {
   2808 		return (ZIO_PIPELINE_CONTINUE);
   2809 	}
   2810 
   2811 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
   2812 
   2813 	ASSERT3U(zio->io_queued_timestamp, >, 0);
   2814 	ASSERT(zio->io_stage == ZIO_STAGE_DVA_THROTTLE);
   2815 
   2816 	mutex_enter(&spa->spa_alloc_lock);
   2817 
   2818 	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
   2819 	avl_add(&spa->spa_alloc_tree, zio);
   2820 
   2821 	nio = zio_io_to_allocate(zio->io_spa);
   2822 	mutex_exit(&spa->spa_alloc_lock);
   2823 
   2824 	if (nio == zio)
   2825 		return (ZIO_PIPELINE_CONTINUE);
   2826 
   2827 	if (nio != NULL) {
   2828 		ASSERT3U(nio->io_queued_timestamp, <=,
   2829 		    zio->io_queued_timestamp);
   2830 		ASSERT(nio->io_stage == ZIO_STAGE_DVA_THROTTLE);
   2831 		/*
   2832 		 * We are passing control to a new zio so make sure that
   2833 		 * it is processed by a different thread. We do this to
   2834 		 * avoid stack overflows that can occur when parents are
   2835 		 * throttled and children are making progress. We allow
   2836 		 * it to go to the head of the taskq since it's already
   2837 		 * been waiting.
   2838 		 */
   2839 		zio_taskq_dispatch(nio, ZIO_TASKQ_ISSUE, B_TRUE);
   2840 	}
   2841 	return (ZIO_PIPELINE_STOP);
   2842 }
   2843 
   2844 void
   2845 zio_allocate_dispatch(spa_t *spa)
   2846 {
   2847 	zio_t *zio;
   2848 
   2849 	mutex_enter(&spa->spa_alloc_lock);
   2850 	zio = zio_io_to_allocate(spa);
   2851 	mutex_exit(&spa->spa_alloc_lock);
   2852 	if (zio == NULL)
   2853 		return;
   2854 
   2855 	ASSERT3U(zio->io_stage, ==, ZIO_STAGE_DVA_THROTTLE);
   2856 	ASSERT0(zio->io_error);
   2857 	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_TRUE);
   2858 }
   2859 
   2860 static int
   2861 zio_dva_allocate(zio_t *zio)
   2862 {
   2863 	spa_t *spa = zio->io_spa;
   2864 	metaslab_class_t *mc = spa_normal_class(spa);
   2865 	blkptr_t *bp = zio->io_bp;
   2866 	int error;
   2867 	int flags = 0;
   2868 
   2869 	if (zio->io_gang_leader == NULL) {
   2870 		ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
   2871 		zio->io_gang_leader = zio;
   2872 	}
   2873 
   2874 	ASSERT(BP_IS_HOLE(bp));
   2875 	ASSERT0(BP_GET_NDVAS(bp));
   2876 	ASSERT3U(zio->io_prop.zp_copies, >, 0);
   2877 	ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
   2878 	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
   2879 
   2880 	if (zio->io_flags & ZIO_FLAG_NODATA) {
   2881 		flags |= METASLAB_DONT_THROTTLE;
   2882 	}
   2883 	if (zio->io_flags & ZIO_FLAG_GANG_CHILD) {
   2884 		flags |= METASLAB_GANG_CHILD;
   2885 	}
   2886 	if (zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE) {
   2887 		flags |= METASLAB_ASYNC_ALLOC;
   2888 	}
   2889 
   2890 	error = metaslab_alloc(spa, mc, zio->io_size, bp,
   2891 	    zio->io_prop.zp_copies, zio->io_txg, NULL, flags,
   2892 	    &zio->io_alloc_list, zio);
   2893 
   2894 	if (error != 0) {
   2895 		spa_dbgmsg(spa, "%s: metaslab allocation failure: zio %p, "
   2896 		    "size %llu, error %d", spa_name(spa), zio, zio->io_size,
   2897 		    error);
   2898 		if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
   2899 			return (zio_write_gang_block(zio));
   2900 		zio->io_error = error;
   2901 	}
   2902 
   2903 	return (ZIO_PIPELINE_CONTINUE);
   2904 }
   2905 
   2906 static int
   2907 zio_dva_free(zio_t *zio)
   2908 {
   2909 	metaslab_free(zio->io_spa, zio->io_bp, zio->io_txg, B_FALSE);
   2910 
   2911 	return (ZIO_PIPELINE_CONTINUE);
   2912 }
   2913 
   2914 static int
   2915 zio_dva_claim(zio_t *zio)
   2916 {
   2917 	int error;
   2918 
   2919 	error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
   2920 	if (error)
   2921 		zio->io_error = error;
   2922 
   2923 	return (ZIO_PIPELINE_CONTINUE);
   2924 }
   2925 
   2926 /*
   2927  * Undo an allocation.  This is used by zio_done() when an I/O fails
   2928  * and we want to give back the block we just allocated.
   2929  * This handles both normal blocks and gang blocks.
   2930  */
   2931 static void
   2932 zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
   2933 {
   2934 	ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
   2935 	ASSERT(zio->io_bp_override == NULL);
   2936 
   2937 	if (!BP_IS_HOLE(bp))
   2938 		metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
   2939 
   2940 	if (gn != NULL) {
   2941 		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
   2942 			zio_dva_unallocate(zio, gn->gn_child[g],
   2943 			    &gn->gn_gbh->zg_blkptr[g]);
   2944 		}
   2945 	}
   2946 }
   2947 
   2948 /*
   2949  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
   2950  */
   2951 int
   2952 zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
   2953     uint64_t size, boolean_t *slog)
   2954 {
   2955 	int error = 1;
   2956 	zio_alloc_list_t io_alloc_list;
   2957 
   2958 	ASSERT(txg > spa_syncing_txg(spa));
   2959 
   2960 	metaslab_trace_init(&io_alloc_list);
   2961 	error = metaslab_alloc(spa, spa_log_class(spa), size, new_bp, 1,
   2962 	    txg, old_bp, METASLAB_HINTBP_AVOID, &io_alloc_list, NULL);
   2963 	if (error == 0) {
   2964 		*slog = TRUE;
   2965 	} else {
   2966 		error = metaslab_alloc(spa, spa_normal_class(spa), size,
   2967 		    new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID,
   2968 		    &io_alloc_list, NULL);
   2969 		if (error == 0)
   2970 			*slog = FALSE;
   2971 	}
   2972 	metaslab_trace_fini(&io_alloc_list);
   2973 
   2974 	if (error == 0) {
   2975 		BP_SET_LSIZE(new_bp, size);
   2976 		BP_SET_PSIZE(new_bp, size);
   2977 		BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
   2978 		BP_SET_CHECKSUM(new_bp,
   2979 		    spa_version(spa) >= SPA_VERSION_SLIM_ZIL
   2980 		    ? ZIO_CHECKSUM_ZILOG2 : ZIO_CHECKSUM_ZILOG);
   2981 		BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
   2982 		BP_SET_LEVEL(new_bp, 0);
   2983 		BP_SET_DEDUP(new_bp, 0);
   2984 		BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
   2985 	}
   2986 
   2987 	return (error);
   2988 }
   2989 
   2990 /*
   2991  * Free an intent log block.
   2992  */
   2993 void
   2994 zio_free_zil(spa_t *spa, uint64_t txg, blkptr_t *bp)
   2995 {
   2996 	ASSERT(BP_GET_TYPE(bp) == DMU_OT_INTENT_LOG);
   2997 	ASSERT(!BP_IS_GANG(bp));
   2998 
   2999 	zio_free(spa, txg, bp);
   3000 }
   3001 
   3002 /*
   3003  * ==========================================================================
   3004  * Read, write and delete to physical devices
   3005  * ==========================================================================
   3006  */
   3007 
   3008 
   3009 /*
   3010  * Issue an I/O to the underlying vdev. Typically the issue pipeline
   3011  * stops after this stage and will resume upon I/O completion.
   3012  * However, there are instances where the vdev layer may need to
   3013  * continue the pipeline when an I/O was not issued. Since the I/O
   3014  * that was sent to the vdev layer might be different than the one
   3015  * currently active in the pipeline (see vdev_queue_io()), we explicitly
   3016  * force the underlying vdev layers to call either zio_execute() or
   3017  * zio_interrupt() to ensure that the pipeline continues with the correct I/O.
   3018  */
   3019 static int
   3020 zio_vdev_io_start(zio_t *zio)
   3021 {
   3022 	vdev_t *vd = zio->io_vd;
   3023 	uint64_t align;
   3024 	spa_t *spa = zio->io_spa;
   3025 	int ret;
   3026 
   3027 	ASSERT(zio->io_error == 0);
   3028 	ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
   3029 
   3030 	if (vd == NULL) {
   3031 		if (!(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
   3032 			spa_config_enter(spa, SCL_ZIO, zio, RW_READER);
   3033 
   3034 		/*
   3035 		 * The mirror_ops handle multiple DVAs in a single BP.
   3036 		 */
   3037 		vdev_mirror_ops.vdev_op_io_start(zio);
   3038 		return (ZIO_PIPELINE_STOP);
   3039 	}
   3040 
   3041 	if (vd->vdev_ops->vdev_op_leaf && zio->io_type == ZIO_TYPE_FREE &&
   3042 	    zio->io_priority == ZIO_PRIORITY_NOW) {
   3043 		trim_map_free(vd, zio->io_offset, zio->io_size, zio->io_txg);
   3044 		return (ZIO_PIPELINE_CONTINUE);
   3045 	}
   3046 
   3047 	ASSERT3P(zio->io_logical, !=, zio);
   3048 
   3049 	/*
   3050 	 * We keep track of time-sensitive I/Os so that the scan thread
   3051 	 * can quickly react to certain workloads.  In particular, we care
   3052 	 * about non-scrubbing, top-level reads and writes with the following
   3053 	 * characteristics:
   3054 	 *	- synchronous writes of user data to non-slog devices
   3055 	 *	- any reads of user data
   3056 	 * When these conditions are met, adjust the timestamp of spa_last_io
   3057 	 * which allows the scan thread to adjust its workload accordingly.
   3058 	 */
   3059 	if (!(zio->io_flags & ZIO_FLAG_SCAN_THREAD) && zio->io_bp != NULL &&
   3060 	    vd == vd->vdev_top && !vd->vdev_islog &&
   3061 	    zio->io_bookmark.zb_objset != DMU_META_OBJSET &&
   3062 	    zio->io_txg != spa_syncing_txg(spa)) {
   3063 		uint64_t old = spa->spa_last_io;
   3064 		uint64_t new = ddi_get_lbolt64();
   3065 		if (old != new)
   3066 			(void) atomic_cas_64(&spa->spa_last_io, old, new);
   3067 	}
   3068 
   3069 	align = 1ULL << vd->vdev_top->vdev_ashift;
   3070 
   3071 	if (!(zio->io_flags & ZIO_FLAG_PHYSICAL) &&
   3072 	    P2PHASE(zio->io_size, align) != 0) {
   3073 		/* Transform logical writes to be a full physical block size. */
   3074 		uint64_t asize = P2ROUNDUP(zio->io_size, align);
   3075 		char *abuf = NULL;
   3076 		if (zio->io_type == ZIO_TYPE_READ ||
   3077 		    zio->io_type == ZIO_TYPE_WRITE)
   3078 			abuf = zio_buf_alloc(asize);
   3079 		ASSERT(vd == vd->vdev_top);
   3080 		if (zio->io_type == ZIO_TYPE_WRITE) {
   3081 			bcopy(zio->io_data, abuf, zio->io_size);
   3082 			bzero(abuf + zio->io_size, asize - zio->io_size);
   3083 		}
   3084 		zio_push_transform(zio, abuf, asize, abuf ? asize : 0,
   3085 		    zio_subblock);
   3086 	}
   3087 
   3088 	/*
   3089 	 * If this is not a physical io, make sure that it is properly aligned
   3090 	 * before proceeding.
   3091 	 */
   3092 	if (!(zio->io_flags & ZIO_FLAG_PHYSICAL)) {
   3093 		ASSERT0(P2PHASE(zio->io_offset, align));
   3094 		ASSERT0(P2PHASE(zio->io_size, align));
   3095 	} else {
   3096 		/*
   3097 		 * For the physical io we allow alignment
   3098 		 * to a logical block size.
   3099 		 */
   3100 		uint64_t log_align =
   3101 		    1ULL << vd->vdev_top->vdev_logical_ashift;
   3102 		ASSERT0(P2PHASE(zio->io_offset, log_align));
   3103 		ASSERT0(P2PHASE(zio->io_size, log_align));
   3104 	}
   3105 
   3106 	VERIFY(zio->io_type == ZIO_TYPE_READ || spa_writeable(spa));
   3107 
   3108 	/*
   3109 	 * If this is a repair I/O, and there's no self-healing involved --
   3110 	 * that is, we're just resilvering what we expect to resilver --
   3111 	 * then don't do the I/O unless zio's txg is actually in vd's DTL.
   3112 	 * This prevents spurious resilvering with nested replication.
   3113 	 * For example, given a mirror of mirrors, (A+B)+(C+D), if only
   3114 	 * A is out of date, we'll read from C+D, then use the data to
   3115 	 * resilver A+B -- but we don't actually want to resilver B, just A.
   3116 	 * The top-level mirror has no way to know this, so instead we just
   3117 	 * discard unnecessary repairs as we work our way down the vdev tree.
   3118 	 * The same logic applies to any form of nested replication:
   3119 	 * ditto + mirror, RAID-Z + replacing, etc.  This covers them all.
   3120 	 */
   3121 	if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
   3122 	    !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
   3123 	    zio->io_txg != 0 &&	/* not a delegated i/o */
   3124 	    !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
   3125 		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
   3126 		zio_vdev_io_bypass(zio);
   3127 		return (ZIO_PIPELINE_CONTINUE);
   3128 	}
   3129 
   3130 	if (vd->vdev_ops->vdev_op_leaf) {
   3131 		switch (zio->io_type) {
   3132 		case ZIO_TYPE_READ:
   3133 			if (vdev_cache_read(zio))
   3134 				return (ZIO_PIPELINE_CONTINUE);
   3135 			/* FALLTHROUGH */
   3136 		case ZIO_TYPE_WRITE:
   3137 		case ZIO_TYPE_FREE:
   3138 			if ((zio = vdev_queue_io(zio)) == NULL)
   3139 				return (ZIO_PIPELINE_STOP);
   3140 
   3141 			if (!vdev_accessible(vd, zio)) {
   3142 				zio->io_error = SET_ERROR(ENXIO);
   3143 				zio_interrupt(zio);
   3144 				return (ZIO_PIPELINE_STOP);
   3145 			}
   3146 			break;
   3147 		}
   3148 		/*
   3149 		 * Note that we ignore repair writes for TRIM because they can
   3150 		 * conflict with normal writes. This isn't an issue because, by
   3151 		 * definition, we only repair blocks that aren't freed.
   3152 		 */
   3153 		if (zio->io_type == ZIO_TYPE_WRITE &&
   3154 		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
   3155 		    !trim_map_write_start(zio))
   3156 			return (ZIO_PIPELINE_STOP);
   3157 	}
   3158 
   3159 	vd->vdev_ops->vdev_op_io_start(zio);
   3160 	return (ZIO_PIPELINE_STOP);
   3161 }
   3162 
   3163 static int
   3164 zio_vdev_io_done(zio_t *zio)
   3165 {
   3166 	vdev_t *vd = zio->io_vd;
   3167 	vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
   3168 	boolean_t unexpected_error = B_FALSE;
   3169 
   3170 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
   3171 		return (ZIO_PIPELINE_STOP);
   3172 
   3173 	ASSERT(zio->io_type == ZIO_TYPE_READ ||
   3174 	    zio->io_type == ZIO_TYPE_WRITE || zio->io_type == ZIO_TYPE_FREE);
   3175 
   3176 	if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
   3177 	    (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE ||
   3178 	    zio->io_type == ZIO_TYPE_FREE)) {
   3179 
   3180 		if (zio->io_type == ZIO_TYPE_WRITE &&
   3181 		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR))
   3182 			trim_map_write_done(zio);
   3183 
   3184 		vdev_queue_io_done(zio);
   3185 
   3186 		if (zio->io_type == ZIO_TYPE_WRITE)
   3187 			vdev_cache_write(zio);
   3188 
   3189 		if (zio_injection_enabled && zio->io_error == 0)
   3190 			zio->io_error = zio_handle_device_injection(vd,
   3191 			    zio, EIO);
   3192 
   3193 		if (zio_injection_enabled && zio->io_error == 0)
   3194 			zio->io_error = zio_handle_label_injection(zio, EIO);
   3195 
   3196 		if (zio->io_error) {
   3197 			if (zio->io_error == ENOTSUP &&
   3198 			    zio->io_type == ZIO_TYPE_FREE) {
   3199 				/* Not all devices support TRIM. */
   3200 			} else if (!vdev_accessible(vd, zio)) {
   3201 				zio->io_error = SET_ERROR(ENXIO);
   3202 			} else {
   3203 				unexpected_error = B_TRUE;
   3204 			}
   3205 		}
   3206 	}
   3207 
   3208 	ops->vdev_op_io_done(zio);
   3209 
   3210 	if (unexpected_error)
   3211 		VERIFY(vdev_probe(vd, zio) == NULL);
   3212 
   3213 	return (ZIO_PIPELINE_CONTINUE);
   3214 }
   3215 
   3216 /*
   3217  * For non-raidz ZIOs, we can just copy aside the bad data read from the
   3218  * disk, and use that to finish the checksum ereport later.
   3219  */
   3220 static void
   3221 zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
   3222     const void *good_buf)
   3223 {
   3224 	/* no processing needed */
   3225 	zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
   3226 }
   3227 
   3228 /*ARGSUSED*/
   3229 void
   3230 zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
   3231 {
   3232 	void *buf = zio_buf_alloc(zio->io_size);
   3233 
   3234 	bcopy(zio->io_data, buf, zio->io_size);
   3235 
   3236 	zcr->zcr_cbinfo = zio->io_size;
   3237 	zcr->zcr_cbdata = buf;
   3238 	zcr->zcr_finish = zio_vsd_default_cksum_finish;
   3239 	zcr->zcr_free = zio_buf_free;
   3240 }
   3241 
   3242 static int
   3243 zio_vdev_io_assess(zio_t *zio)
   3244 {
   3245 	vdev_t *vd = zio->io_vd;
   3246 
   3247 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
   3248 		return (ZIO_PIPELINE_STOP);
   3249 
   3250 	if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
   3251 		spa_config_exit(zio->io_spa, SCL_ZIO, zio);
   3252 
   3253 	if (zio->io_vsd != NULL) {
   3254 		zio->io_vsd_ops->vsd_free(zio);
   3255 		zio->io_vsd = NULL;
   3256 	}
   3257 
   3258 	if (zio_injection_enabled && zio->io_error == 0)
   3259 		zio->io_error = zio_handle_fault_injection(zio, EIO);
   3260 
   3261 	if (zio->io_type == ZIO_TYPE_FREE &&
   3262 	    zio->io_priority != ZIO_PRIORITY_NOW) {
   3263 		switch (zio->io_error) {
   3264 		case 0:
   3265 			ZIO_TRIM_STAT_INCR(bytes, zio->io_size);
   3266 			ZIO_TRIM_STAT_BUMP(success);
   3267 			break;
   3268 		case EOPNOTSUPP:
   3269 			ZIO_TRIM_STAT_BUMP(unsupported);
   3270 			break;
   3271 		default:
   3272 			ZIO_TRIM_STAT_BUMP(failed);
   3273 			break;
   3274 		}
   3275 	}
   3276 
   3277 	/*
   3278 	 * If the I/O failed, determine whether we should attempt to retry it.
   3279 	 *
   3280 	 * On retry, we cut in line in the issue queue, since we don't want
   3281 	 * compression/checksumming/etc. work to prevent our (cheap) IO reissue.
   3282 	 */
   3283 	if (zio->io_error && vd == NULL &&
   3284 	    !(zio->io_flags & (ZIO_FLAG_DONT_RETRY | ZIO_FLAG_IO_RETRY))) {
   3285 		ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE));	/* not a leaf */
   3286 		ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));	/* not a leaf */
   3287 		zio->io_error = 0;
   3288 		zio->io_flags |= ZIO_FLAG_IO_RETRY |
   3289 		    ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
   3290 		zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
   3291 		zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
   3292 		    zio_requeue_io_start_cut_in_line);
   3293 		return (ZIO_PIPELINE_STOP);
   3294 	}
   3295 
   3296 	/*
   3297 	 * If we got an error on a leaf device, convert it to ENXIO
   3298 	 * if the device is not accessible at all.
   3299 	 */
   3300 	if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
   3301 	    !vdev_accessible(vd, zio))
   3302 		zio->io_error = SET_ERROR(ENXIO);
   3303 
   3304 	/*
   3305 	 * If we can't write to an interior vdev (mirror or RAID-Z),
   3306 	 * set vdev_cant_write so that we stop trying to allocate from it.
   3307 	 */
   3308 	if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
   3309 	    vd != NULL && !vd->vdev_ops->vdev_op_leaf) {
   3310 		vd->vdev_cant_write = B_TRUE;
   3311 	}
   3312 
   3313 	if (zio->io_error)
   3314 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   3315 
   3316 	if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
   3317 	    zio->io_physdone != NULL) {
   3318 		ASSERT(!(zio->io_flags & ZIO_FLAG_DELEGATED));
   3319 		ASSERT(zio->io_child_type == ZIO_CHILD_VDEV);
   3320 		zio->io_physdone(zio->io_logical);
   3321 	}
   3322 
   3323 	return (ZIO_PIPELINE_CONTINUE);
   3324 }
   3325 
   3326 void
   3327 zio_vdev_io_reissue(zio_t *zio)
   3328 {
   3329 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
   3330 	ASSERT(zio->io_error == 0);
   3331 
   3332 	zio->io_stage >>= 1;
   3333 }
   3334 
   3335 void
   3336 zio_vdev_io_redone(zio_t *zio)
   3337 {
   3338 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
   3339 
   3340 	zio->io_stage >>= 1;
   3341 }
   3342 
   3343 void
   3344 zio_vdev_io_bypass(zio_t *zio)
   3345 {
   3346 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
   3347 	ASSERT(zio->io_error == 0);
   3348 
   3349 	zio->io_flags |= ZIO_FLAG_IO_BYPASS;
   3350 	zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS >> 1;
   3351 }
   3352 
   3353 /*
   3354  * ==========================================================================
   3355  * Generate and verify checksums
   3356  * ==========================================================================
   3357  */
   3358 static int
   3359 zio_checksum_generate(zio_t *zio)
   3360 {
   3361 	blkptr_t *bp = zio->io_bp;
   3362 	enum zio_checksum checksum;
   3363 
   3364 	if (bp == NULL) {
   3365 		/*
   3366 		 * This is zio_write_phys().
   3367 		 * We're either generating a label checksum, or none at all.
   3368 		 */
   3369 		checksum = zio->io_prop.zp_checksum;
   3370 
   3371 		if (checksum == ZIO_CHECKSUM_OFF)
   3372 			return (ZIO_PIPELINE_CONTINUE);
   3373 
   3374 		ASSERT(checksum == ZIO_CHECKSUM_LABEL);
   3375 	} else {
   3376 		if (BP_IS_GANG(bp) && zio->io_child_type == ZIO_CHILD_GANG) {
   3377 			ASSERT(!IO_IS_ALLOCATING(zio));
   3378 			checksum = ZIO_CHECKSUM_GANG_HEADER;
   3379 		} else {
   3380 			checksum = BP_GET_CHECKSUM(bp);
   3381 		}
   3382 	}
   3383 
   3384 	zio_checksum_compute(zio, checksum, zio->io_data, zio->io_size);
   3385 
   3386 	return (ZIO_PIPELINE_CONTINUE);
   3387 }
   3388 
   3389 static int
   3390 zio_checksum_verify(zio_t *zio)
   3391 {
   3392 	zio_bad_cksum_t info;
   3393 	blkptr_t *bp = zio->io_bp;
   3394 	int error;
   3395 
   3396 	ASSERT(zio->io_vd != NULL);
   3397 
   3398 	if (bp == NULL) {
   3399 		/*
   3400 		 * This is zio_read_phys().
   3401 		 * We're either verifying a label checksum, or nothing at all.
   3402 		 */
   3403 		if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
   3404 			return (ZIO_PIPELINE_CONTINUE);
   3405 
   3406 		ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
   3407 	}
   3408 
   3409 	if ((error = zio_checksum_error(zio, &info)) != 0) {
   3410 		zio->io_error = error;
   3411 		if (error == ECKSUM &&
   3412 		    !(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
   3413 			zfs_ereport_start_checksum(zio->io_spa,
   3414 			    zio->io_vd, zio, zio->io_offset,
   3415 			    zio->io_size, NULL, &info);
   3416 		}
   3417 	}
   3418 
   3419 	return (ZIO_PIPELINE_CONTINUE);
   3420 }
   3421 
   3422 /*
   3423  * Called by RAID-Z to ensure we don't compute the checksum twice.
   3424  */
   3425 void
   3426 zio_checksum_verified(zio_t *zio)
   3427 {
   3428 	zio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
   3429 }
   3430 
   3431 /*
   3432  * ==========================================================================
   3433  * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
   3434  * An error of 0 indicates success.  ENXIO indicates whole-device failure,
   3435  * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
   3436  * indicate errors that are specific to one I/O, and most likely permanent.
   3437  * Any other error is presumed to be worse because we weren't expecting it.
   3438  * ==========================================================================
   3439  */
   3440 int
   3441 zio_worst_error(int e1, int e2)
   3442 {
   3443 	static int zio_error_rank[] = { 0, ENXIO, ECKSUM, EIO };
   3444 	int r1, r2;
   3445 
   3446 	for (r1 = 0; r1 < sizeof (zio_error_rank) / sizeof (int); r1++)
   3447 		if (e1 == zio_error_rank[r1])
   3448 			break;
   3449 
   3450 	for (r2 = 0; r2 < sizeof (zio_error_rank) / sizeof (int); r2++)
   3451 		if (e2 == zio_error_rank[r2])
   3452 			break;
   3453 
   3454 	return (r1 > r2 ? e1 : e2);
   3455 }
   3456 
   3457 /*
   3458  * ==========================================================================
   3459  * I/O completion
   3460  * ==========================================================================
   3461  */
   3462 static int
   3463 zio_ready(zio_t *zio)
   3464 {
   3465 	blkptr_t *bp = zio->io_bp;
   3466 	zio_t *pio, *pio_next;
   3467 	zio_link_t *zl = NULL;
   3468 
   3469 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
   3470 	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
   3471 		return (ZIO_PIPELINE_STOP);
   3472 
   3473 	if (zio->io_ready) {
   3474 		ASSERT(IO_IS_ALLOCATING(zio));
   3475 		ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
   3476 		    (zio->io_flags & ZIO_FLAG_NOPWRITE));
   3477 		ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
   3478 
   3479 		zio->io_ready(zio);
   3480 	}
   3481 
   3482 	if (bp != NULL && bp != &zio->io_bp_copy)
   3483 		zio->io_bp_copy = *bp;
   3484 
   3485 	if (zio->io_error != 0) {
   3486 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   3487 
   3488 		if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
   3489 			ASSERT(IO_IS_ALLOCATING(zio));
   3490 			ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
   3491 			/*
   3492 			 * We were unable to allocate anything, unreserve and
   3493 			 * issue the next I/O to allocate.
   3494 			 */
   3495 			metaslab_class_throttle_unreserve(
   3496 			    spa_normal_class(zio->io_spa),
   3497 			    zio->io_prop.zp_copies, zio);
   3498 			zio_allocate_dispatch(zio->io_spa);
   3499 		}
   3500 	}
   3501 
   3502 	mutex_enter(&zio->io_lock);
   3503 	zio->io_state[ZIO_WAIT_READY] = 1;
   3504 	pio = zio_walk_parents(zio, &zl);
   3505 	mutex_exit(&zio->io_lock);
   3506 
   3507 	/*
   3508 	 * As we notify zio's parents, new parents could be added.
   3509 	 * New parents go to the head of zio's io_parent_list, however,
   3510 	 * so we will (correctly) not notify them.  The remainder of zio's
   3511 	 * io_parent_list, from 'pio_next' onward, cannot change because
   3512 	 * all parents must wait for us to be done before they can be done.
   3513 	 */
   3514 	for (; pio != NULL; pio = pio_next) {
   3515 		pio_next = zio_walk_parents(zio, &zl);
   3516 		zio_notify_parent(pio, zio, ZIO_WAIT_READY);
   3517 	}
   3518 
   3519 	if (zio->io_flags & ZIO_FLAG_NODATA) {
   3520 		if (BP_IS_GANG(bp)) {
   3521 			zio->io_flags &= ~ZIO_FLAG_NODATA;
   3522 		} else {
   3523 			ASSERT((uintptr_t)zio->io_data < SPA_MAXBLOCKSIZE);
   3524 			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
   3525 		}
   3526 	}
   3527 
   3528 	if (zio_injection_enabled &&
   3529 	    zio->io_spa->spa_syncing_txg == zio->io_txg)
   3530 		zio_handle_ignored_writes(zio);
   3531 
   3532 	return (ZIO_PIPELINE_CONTINUE);
   3533 }
   3534 
   3535 /*
   3536  * Update the allocation throttle accounting.
   3537  */
   3538 static void
   3539 zio_dva_throttle_done(zio_t *zio)
   3540 {
   3541 	zio_t *lio = zio->io_logical;
   3542 	zio_t *pio = zio_unique_parent(zio);
   3543 	vdev_t *vd = zio->io_vd;
   3544 	int flags = METASLAB_ASYNC_ALLOC;
   3545 
   3546 	ASSERT3P(zio->io_bp, !=, NULL);
   3547 	ASSERT3U(zio->io_type, ==, ZIO_TYPE_WRITE);
   3548 	ASSERT3U(zio->io_priority, ==, ZIO_PRIORITY_ASYNC_WRITE);
   3549 	ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
   3550 	ASSERT(vd != NULL);
   3551 	ASSERT3P(vd, ==, vd->vdev_top);
   3552 	ASSERT(!(zio->io_flags & (ZIO_FLAG_IO_REPAIR | ZIO_FLAG_IO_RETRY)));
   3553 	ASSERT(zio->io_flags & ZIO_FLAG_IO_ALLOCATING);
   3554 	ASSERT(!(lio->io_flags & ZIO_FLAG_IO_REWRITE));
   3555 	ASSERT(!(lio->io_orig_flags & ZIO_FLAG_NODATA));
   3556 
   3557 	/*
   3558 	 * Parents of gang children can have two flavors -- ones that
   3559 	 * allocated the gang header (will have ZIO_FLAG_IO_REWRITE set)
   3560 	 * and ones that allocated the constituent blocks. The allocation
   3561 	 * throttle needs to know the allocating parent zio so we must find
   3562 	 * it here.
   3563 	 */
   3564 	if (pio->io_child_type == ZIO_CHILD_GANG) {
   3565 		/*
   3566 		 * If our parent is a rewrite gang child then our grandparent
   3567 		 * would have been the one that performed the allocation.
   3568 		 */
   3569 		if (pio->io_flags & ZIO_FLAG_IO_REWRITE)
   3570 			pio = zio_unique_parent(pio);
   3571 		flags |= METASLAB_GANG_CHILD;
   3572 	}
   3573 
   3574 	ASSERT(IO_IS_ALLOCATING(pio));
   3575 	ASSERT3P(zio, !=, zio->io_logical);
   3576 	ASSERT(zio->io_logical != NULL);
   3577 	ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REPAIR));
   3578 	ASSERT0(zio->io_flags & ZIO_FLAG_NOPWRITE);
   3579 
   3580 	mutex_enter(&pio->io_lock);
   3581 	metaslab_group_alloc_decrement(zio->io_spa, vd->vdev_id, pio, flags);
   3582 	mutex_exit(&pio->io_lock);
   3583 
   3584 	metaslab_class_throttle_unreserve(spa_normal_class(zio->io_spa),
   3585 	    1, pio);
   3586 
   3587 	/*
   3588 	 * Call into the pipeline to see if there is more work that
   3589 	 * needs to be done. If there is work to be done it will be
   3590 	 * dispatched to another taskq thread.
   3591 	 */
   3592 	zio_allocate_dispatch(zio->io_spa);
   3593 }
   3594 
   3595 static int
   3596 zio_done(zio_t *zio)
   3597 {
   3598 	spa_t *spa = zio->io_spa;
   3599 	zio_t *lio = zio->io_logical;
   3600 	blkptr_t *bp = zio->io_bp;
   3601 	vdev_t *vd = zio->io_vd;
   3602 	uint64_t psize = zio->io_size;
   3603 	zio_t *pio, *pio_next;
   3604 	metaslab_class_t *mc = spa_normal_class(spa);
   3605 	zio_link_t *zl = NULL;
   3606 
   3607 	/*
   3608 	 * If our children haven't all completed,
   3609 	 * wait for them and then repeat this pipeline stage.
   3610 	 */
   3611 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
   3612 	    zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
   3613 	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
   3614 	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
   3615 		return (ZIO_PIPELINE_STOP);
   3616 
   3617 	/*
   3618 	 * If the allocation throttle is enabled, then update the accounting.
   3619 	 * We only track child I/Os that are part of an allocating async
   3620 	 * write. We must do this since the allocation is performed
   3621 	 * by the logical I/O but the actual write is done by child I/Os.
   3622 	 */
   3623 	if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING &&
   3624 	    zio->io_child_type == ZIO_CHILD_VDEV) {
   3625 		ASSERT(mc->mc_alloc_throttle_enabled);
   3626 		zio_dva_throttle_done(zio);
   3627 	}
   3628 
   3629 	/*
   3630 	 * If the allocation throttle is enabled, verify that
   3631 	 * we have decremented the refcounts for every I/O that was throttled.
   3632 	 */
   3633 	if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
   3634 		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
   3635 		ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
   3636 		ASSERT(bp != NULL);
   3637 		metaslab_group_alloc_verify(spa, zio->io_bp, zio);
   3638 		VERIFY(refcount_not_held(&mc->mc_alloc_slots, zio));
   3639 	}
   3640 
   3641 	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
   3642 		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
   3643 			ASSERT(zio->io_children[c][w] == 0);
   3644 
   3645 	if (bp != NULL && !BP_IS_EMBEDDED(bp)) {
   3646 		ASSERT(bp->blk_pad[0] == 0);
   3647 		ASSERT(bp->blk_pad[1] == 0);
   3648 		ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
   3649 		    (bp == zio_unique_parent(zio)->io_bp));
   3650 		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
   3651 		    zio->io_bp_override == NULL &&
   3652 		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
   3653 			ASSERT(!BP_SHOULD_BYTESWAP(bp));
   3654 			ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(bp));
   3655 			ASSERT(BP_COUNT_GANG(bp) == 0 ||
   3656 			    (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
   3657 		}
   3658 		if (zio->io_flags & ZIO_FLAG_NOPWRITE)
   3659 			VERIFY(BP_EQUAL(bp, &zio->io_bp_orig));
   3660 	}
   3661 
   3662 	/*
   3663 	 * If there were child vdev/gang/ddt errors, they apply to us now.
   3664 	 */
   3665 	zio_inherit_child_errors(zio, ZIO_CHILD_VDEV);
   3666 	zio_inherit_child_errors(zio, ZIO_CHILD_GANG);
   3667 	zio_inherit_child_errors(zio, ZIO_CHILD_DDT);
   3668 
   3669 	/*
   3670 	 * If the I/O on the transformed data was successful, generate any
   3671 	 * checksum reports now while we still have the transformed data.
   3672 	 */
   3673 	if (zio->io_error == 0) {
   3674 		while (zio->io_cksum_report != NULL) {
   3675 			zio_cksum_report_t *zcr = zio->io_cksum_report;
   3676 			uint64_t align = zcr->zcr_align;
   3677 			uint64_t asize = P2ROUNDUP(psize, align);
   3678 			char *abuf = zio->io_data;
   3679 
   3680 			if (asize != psize) {
   3681 				abuf = zio_buf_alloc(asize);
   3682 				bcopy(zio->io_data, abuf, psize);
   3683 				bzero(abuf + psize, asize - psize);
   3684 			}
   3685 
   3686 			zio->io_cksum_report = zcr->zcr_next;
   3687 			zcr->zcr_next = NULL;
   3688 			zcr->zcr_finish(zcr, abuf);
   3689 			zfs_ereport_free_checksum(zcr);
   3690 
   3691 			if (asize != psize)
   3692 				zio_buf_free(abuf, asize);
   3693 		}
   3694 	}
   3695 
   3696 	zio_pop_transforms(zio);	/* note: may set zio->io_error */
   3697 
   3698 	vdev_stat_update(zio, psize);
   3699 
   3700 	if (zio->io_error) {
   3701 		/*
   3702 		 * If this I/O is attached to a particular vdev,
   3703 		 * generate an error message describing the I/O failure
   3704 		 * at the block level.  We ignore these errors if the
   3705 		 * device is currently unavailable.
   3706 		 */
   3707 		if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
   3708 			zfs_ereport_post(FM_EREPORT_ZFS_IO, spa, vd, zio, 0, 0);
   3709 
   3710 		if ((zio->io_error == EIO || !(zio->io_flags &
   3711 		    (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
   3712 		    zio == lio) {
   3713 			/*
   3714 			 * For logical I/O requests, tell the SPA to log the
   3715 			 * error and generate a logical data ereport.
   3716 			 */
   3717 			spa_log_error(spa, zio);
   3718 			zfs_ereport_post(FM_EREPORT_ZFS_DATA, spa, NULL, zio,
   3719 			    0, 0);
   3720 		}
   3721 	}
   3722 
   3723 	if (zio->io_error && zio == lio) {
   3724 		/*
   3725 		 * Determine whether zio should be reexecuted.  This will
   3726 		 * propagate all the way to the root via zio_notify_parent().
   3727 		 */
   3728 		ASSERT(vd == NULL && bp != NULL);
   3729 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   3730 
   3731 		if (IO_IS_ALLOCATING(zio) &&
   3732 		    !(zio->io_flags & ZIO_FLAG_CANFAIL)) {
   3733 			if (zio->io_error != ENOSPC)
   3734 				zio->io_reexecute |= ZIO_REEXECUTE_NOW;
   3735 			else
   3736 				zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
   3737 		}
   3738 
   3739 		if ((zio->io_type == ZIO_TYPE_READ ||
   3740 		    zio->io_type == ZIO_TYPE_FREE) &&
   3741 		    !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
   3742 		    zio->io_error == ENXIO &&
   3743 		    spa_load_state(spa) == SPA_LOAD_NONE &&
   3744 		    spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
   3745 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
   3746 
   3747 		if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
   3748 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
   3749 
   3750 		/*
   3751 		 * Here is a possibly good place to attempt to do
   3752 		 * either combinatorial reconstruction or error correction
   3753 		 * based on checksums.  It also might be a good place
   3754 		 * to send out preliminary ereports before we suspend
   3755 		 * processing.
   3756 		 */
   3757 	}
   3758 
   3759 	/*
   3760 	 * If there were logical child errors, they apply to us now.
   3761 	 * We defer this until now to avoid conflating logical child
   3762 	 * errors with errors that happened to the zio itself when
   3763 	 * updating vdev stats and reporting FMA events above.
   3764 	 */
   3765 	zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
   3766 
   3767 	if ((zio->io_error || zio->io_reexecute) &&
   3768 	    IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
   3769 	    !(zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)))
   3770 		zio_dva_unallocate(zio, zio->io_gang_tree, bp);
   3771 
   3772 	zio_gang_tree_free(&zio->io_gang_tree);
   3773 
   3774 	/*
   3775 	 * Godfather I/Os should never suspend.
   3776 	 */
   3777 	if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
   3778 	    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
   3779 		zio->io_reexecute = 0;
   3780 
   3781 	if (zio->io_reexecute) {
   3782 		/*
   3783 		 * This is a logical I/O that wants to reexecute.
   3784 		 *
   3785 		 * Reexecute is top-down.  When an i/o fails, if it's not
   3786 		 * the root, it simply notifies its parent and sticks around.
   3787 		 * The parent, seeing that it still has children in zio_done(),
   3788 		 * does the same.  This percolates all the way up to the root.
   3789 		 * The root i/o will reexecute or suspend the entire tree.
   3790 		 *
   3791 		 * This approach ensures that zio_reexecute() honors
   3792 		 * all the original i/o dependency relationships, e.g.
   3793 		 * parents not executing until children are ready.
   3794 		 */
   3795 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   3796 
   3797 		zio->io_gang_leader = NULL;
   3798 
   3799 		mutex_enter(&zio->io_lock);
   3800 		zio->io_state[ZIO_WAIT_DONE] = 1;
   3801 		mutex_exit(&zio->io_lock);
   3802 
   3803 		/*
   3804 		 * "The Godfather" I/O monitors its children but is
   3805 		 * not a true parent to them. It will track them through
   3806 		 * the pipeline but severs its ties whenever they get into
   3807 		 * trouble (e.g. suspended). This allows "The Godfather"
   3808 		 * I/O to return status without blocking.
   3809 		 */
   3810 		zl = NULL;
   3811 		for (pio = zio_walk_parents(zio, &zl); pio != NULL;
   3812 		    pio = pio_next) {
   3813 			zio_link_t *remove_zl = zl;
   3814 			pio_next = zio_walk_parents(zio, &zl);
   3815 
   3816 			if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
   3817 			    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
   3818 				zio_remove_child(pio, zio, remove_zl);
   3819 				zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
   3820 			}
   3821 		}
   3822 
   3823 		if ((pio = zio_unique_parent(zio)) != NULL) {
   3824 			/*
   3825 			 * We're not a root i/o, so there's nothing to do
   3826 			 * but notify our parent.  Don't propagate errors
   3827 			 * upward since we haven't permanently failed yet.
   3828 			 */
   3829 			ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
   3830 			zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
   3831 			zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
   3832 		} else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
   3833 			/*
   3834 			 * We'd fail again if we reexecuted now, so suspend
   3835 			 * until conditions improve (e.g. device comes online).
   3836 			 */
   3837 			zio_suspend(spa, zio);
   3838 		} else {
   3839 			/*
   3840 			 * Reexecution is potentially a huge amount of work.
   3841 			 * Hand it off to the otherwise-unused claim taskq.
   3842 			 */
   3843 #if defined(illumos) || !defined(_KERNEL)
   3844 			ASSERT(zio->io_tqent.tqent_next == NULL);
   3845 #elif defined(__NetBSD__)
   3846 			ASSERT(zio->io_tqent.tqent_queued == 0);
   3847 #else
   3848 			ASSERT(zio->io_tqent.tqent_task.ta_pending == 0);
   3849 #endif
   3850 			spa_taskq_dispatch_ent(spa, ZIO_TYPE_CLAIM,
   3851 			    ZIO_TASKQ_ISSUE, (task_func_t *)zio_reexecute, zio,
   3852 			    0, &zio->io_tqent);
   3853 		}
   3854 		return (ZIO_PIPELINE_STOP);
   3855 	}
   3856 
   3857 	ASSERT(zio->io_child_count == 0);
   3858 	ASSERT(zio->io_reexecute == 0);
   3859 	ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
   3860 
   3861 	/*
   3862 	 * Report any checksum errors, since the I/O is complete.
   3863 	 */
   3864 	while (zio->io_cksum_report != NULL) {
   3865 		zio_cksum_report_t *zcr = zio->io_cksum_report;
   3866 		zio->io_cksum_report = zcr->zcr_next;
   3867 		zcr->zcr_next = NULL;
   3868 		zcr->zcr_finish(zcr, NULL);
   3869 		zfs_ereport_free_checksum(zcr);
   3870 	}
   3871 
   3872 	/*
   3873 	 * It is the responsibility of the done callback to ensure that this
   3874 	 * particular zio is no longer discoverable for adoption, and as
   3875 	 * such, cannot acquire any new parents.
   3876 	 */
   3877 	if (zio->io_done)
   3878 		zio->io_done(zio);
   3879 
   3880 	mutex_enter(&zio->io_lock);
   3881 	zio->io_state[ZIO_WAIT_DONE] = 1;
   3882 	mutex_exit(&zio->io_lock);
   3883 
   3884 	zl = NULL;
   3885 	for (pio = zio_walk_parents(zio, &zl); pio != NULL; pio = pio_next) {
   3886 		zio_link_t *remove_zl = zl;
   3887 		pio_next = zio_walk_parents(zio, &zl);
   3888 		zio_remove_child(pio, zio, remove_zl);
   3889 		zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
   3890 	}
   3891 
   3892 	if (zio->io_waiter != NULL) {
   3893 		mutex_enter(&zio->io_lock);
   3894 		zio->io_executor = NULL;
   3895 		cv_broadcast(&zio->io_cv);
   3896 		mutex_exit(&zio->io_lock);
   3897 	} else {
   3898 		zio_destroy(zio);
   3899 	}
   3900 
   3901 	return (ZIO_PIPELINE_STOP);
   3902 }
   3903 
   3904 /*
   3905  * ==========================================================================
   3906  * I/O pipeline definition
   3907  * ==========================================================================
   3908  */
   3909 static zio_pipe_stage_t *zio_pipeline[] = {
   3910 	NULL,
   3911 	zio_read_bp_init,
   3912 	zio_write_bp_init,
   3913 	zio_free_bp_init,
   3914 	zio_issue_async,
   3915 	zio_write_compress,
   3916 	zio_checksum_generate,
   3917 	zio_nop_write,
   3918 	zio_ddt_read_start,
   3919 	zio_ddt_read_done,
   3920 	zio_ddt_write,
   3921 	zio_ddt_free,
   3922 	zio_gang_assemble,
   3923 	zio_gang_issue,
   3924 	zio_dva_throttle,
   3925 	zio_dva_allocate,
   3926 	zio_dva_free,
   3927 	zio_dva_claim,
   3928 	zio_ready,
   3929 	zio_vdev_io_start,
   3930 	zio_vdev_io_done,
   3931 	zio_vdev_io_assess,
   3932 	zio_checksum_verify,
   3933 	zio_done
   3934 };
   3935 
   3936 
   3937 
   3938 
   3939 /*
   3940  * Compare two zbookmark_phys_t's to see which we would reach first in a
   3941  * pre-order traversal of the object tree.
   3942  *
   3943  * This is simple in every case aside from the meta-dnode object. For all other
   3944  * objects, we traverse them in order (object 1 before object 2, and so on).
   3945  * However, all of these objects are traversed while traversing object 0, since
   3946  * the data it points to is the list of objects.  Thus, we need to convert to a
   3947  * canonical representation so we can compare meta-dnode bookmarks to
   3948  * non-meta-dnode bookmarks.
   3949  *
   3950  * We do this by calculating "equivalents" for each field of the zbookmark.
   3951  * zbookmarks outside of the meta-dnode use their own object and level, and
   3952  * calculate the level 0 equivalent (the first L0 blkid that is contained in the
   3953  * blocks this bookmark refers to) by multiplying their blkid by their span
   3954  * (the number of L0 blocks contained within one block at their level).
   3955  * zbookmarks inside the meta-dnode calculate their object equivalent
   3956  * (which is L0equiv * dnodes per data block), use 0 for their L0equiv, and use
   3957  * level + 1<<31 (any value larger than a level could ever be) for their level.
   3958  * This causes them to always compare before a bookmark in their object
   3959  * equivalent, compare appropriately to bookmarks in other objects, and to
   3960  * compare appropriately to other bookmarks in the meta-dnode.
   3961  */
   3962 int
   3963 zbookmark_compare(uint16_t dbss1, uint8_t ibs1, uint16_t dbss2, uint8_t ibs2,
   3964     const zbookmark_phys_t *zb1, const zbookmark_phys_t *zb2)
   3965 {
   3966 	/*
   3967 	 * These variables represent the "equivalent" values for the zbookmark,
   3968 	 * after converting zbookmarks inside the meta dnode to their
   3969 	 * normal-object equivalents.
   3970 	 */
   3971 	uint64_t zb1obj, zb2obj;
   3972 	uint64_t zb1L0, zb2L0;
   3973 	uint64_t zb1level, zb2level;
   3974 
   3975 	if (zb1->zb_object == zb2->zb_object &&
   3976 	    zb1->zb_level == zb2->zb_level &&
   3977 	    zb1->zb_blkid == zb2->zb_blkid)
   3978 		return (0);
   3979 
   3980 	/*
   3981 	 * BP_SPANB calculates the span in blocks.
   3982 	 */
   3983 	zb1L0 = (zb1->zb_blkid) * BP_SPANB(ibs1, zb1->zb_level);
   3984 	zb2L0 = (zb2->zb_blkid) * BP_SPANB(ibs2, zb2->zb_level);
   3985 
   3986 	if (zb1->zb_object == DMU_META_DNODE_OBJECT) {
   3987 		zb1obj = zb1L0 * (dbss1 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
   3988 		zb1L0 = 0;
   3989 		zb1level = zb1->zb_level + COMPARE_META_LEVEL;
   3990 	} else {
   3991 		zb1obj = zb1->zb_object;
   3992 		zb1level = zb1->zb_level;
   3993 	}
   3994 
   3995 	if (zb2->zb_object == DMU_META_DNODE_OBJECT) {
   3996 		zb2obj = zb2L0 * (dbss2 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
   3997 		zb2L0 = 0;
   3998 		zb2level = zb2->zb_level + COMPARE_META_LEVEL;
   3999 	} else {
   4000 		zb2obj = zb2->zb_object;
   4001 		zb2level = zb2->zb_level;
   4002 	}
   4003 
   4004 	/* Now that we have a canonical representation, do the comparison. */
   4005 	if (zb1obj != zb2obj)
   4006 		return (zb1obj < zb2obj ? -1 : 1);
   4007 	else if (zb1L0 != zb2L0)
   4008 		return (zb1L0 < zb2L0 ? -1 : 1);
   4009 	else if (zb1level != zb2level)
   4010 		return (zb1level > zb2level ? -1 : 1);
   4011 	/*
   4012 	 * This can (theoretically) happen if the bookmarks have the same object
   4013 	 * and level, but different blkids, if the block sizes are not the same.
   4014 	 * There is presently no way to change the indirect block sizes
   4015 	 */
   4016 	return (0);
   4017 }
   4018 
   4019 /*
   4020  *  This function checks the following: given that last_block is the place that
   4021  *  our traversal stopped last time, does that guarantee that we've visited
   4022  *  every node under subtree_root?  Therefore, we can't just use the raw output
   4023  *  of zbookmark_compare.  We have to pass in a modified version of
   4024  *  subtree_root; by incrementing the block id, and then checking whether
   4025  *  last_block is before or equal to that, we can tell whether or not having
   4026  *  visited last_block implies that all of subtree_root's children have been
   4027  *  visited.
   4028  */
   4029 boolean_t
   4030 zbookmark_subtree_completed(const dnode_phys_t *dnp,
   4031     const zbookmark_phys_t *subtree_root, const zbookmark_phys_t *last_block)
   4032 {
   4033 	zbookmark_phys_t mod_zb = *subtree_root;
   4034 	mod_zb.zb_blkid++;
   4035 	ASSERT(last_block->zb_level == 0);
   4036 
   4037 	/* The objset_phys_t isn't before anything. */
   4038 	if (dnp == NULL)
   4039 		return (B_FALSE);
   4040 
   4041 	/*
   4042 	 * We pass in 1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT) for the
   4043 	 * data block size in sectors, because that variable is only used if
   4044 	 * the bookmark refers to a block in the meta-dnode.  Since we don't
   4045 	 * know without examining it what object it refers to, and there's no
   4046 	 * harm in passing in this value in other cases, we always pass it in.
   4047 	 *
   4048 	 * We pass in 0 for the indirect block size shift because zb2 must be
   4049 	 * level 0.  The indirect block size is only used to calculate the span
   4050 	 * of the bookmark, but since the bookmark must be level 0, the span is
   4051 	 * always 1, so the math works out.
   4052 	 *
   4053 	 * If you make changes to how the zbookmark_compare code works, be sure
   4054 	 * to make sure that this code still works afterwards.
   4055 	 */
   4056 	return (zbookmark_compare(dnp->dn_datablkszsec, dnp->dn_indblkshift,
   4057 	    1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT), 0, &mod_zb,
   4058 	    last_block) <= 0);
   4059 }
   4060