Home | History | Annotate | Line # | Download | only in kern
vfs_wapbl.c revision 1.115
      1 /*	$NetBSD: vfs_wapbl.c,v 1.115 2024/12/07 02:23:09 riastradh Exp $	*/
      2 
      3 /*-
      4  * Copyright (c) 2003, 2008, 2009 The NetBSD Foundation, Inc.
      5  * All rights reserved.
      6  *
      7  * This code is derived from software contributed to The NetBSD Foundation
      8  * by Wasabi Systems, Inc.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
     20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
     23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 /*
     33  * This implements file system independent write ahead filesystem logging.
     34  */
     35 
     36 #define WAPBL_INTERNAL
     37 
     38 #include <sys/cdefs.h>
     39 __KERNEL_RCSID(0, "$NetBSD: vfs_wapbl.c,v 1.115 2024/12/07 02:23:09 riastradh Exp $");
     40 
     41 #include <sys/param.h>
     42 #include <sys/types.h>
     43 
     44 #include <sys/bitops.h>
     45 #include <sys/time.h>
     46 #include <sys/wapbl.h>
     47 #include <sys/wapbl_replay.h>
     48 
     49 #ifdef _KERNEL
     50 
     51 #include <sys/atomic.h>
     52 #include <sys/conf.h>
     53 #include <sys/evcnt.h>
     54 #include <sys/file.h>
     55 #include <sys/kauth.h>
     56 #include <sys/kernel.h>
     57 #include <sys/module.h>
     58 #include <sys/mount.h>
     59 #include <sys/mutex.h>
     60 #include <sys/namei.h>
     61 #include <sys/proc.h>
     62 #include <sys/resourcevar.h>
     63 #include <sys/sysctl.h>
     64 #include <sys/uio.h>
     65 #include <sys/vnode.h>
     66 
     67 #include <miscfs/specfs/specdev.h>
     68 
     69 #define	wapbl_alloc(s)		kmem_alloc((s), KM_SLEEP)
     70 #define	wapbl_free(a, s)	kmem_free((a), (s))
     71 #define	wapbl_calloc(n, s)	kmem_zalloc((n)*(s), KM_SLEEP)
     72 
     73 static int wapbl_flush_disk_cache = 1;
     74 static int wapbl_verbose_commit = 0;
     75 static int wapbl_allow_dpofua = 0;	/* switched off by default for now */
     76 static int wapbl_journal_iobufs = 4;
     77 
     78 static inline size_t wapbl_space_free(size_t, off_t, off_t);
     79 
     80 #else /* !_KERNEL */
     81 
     82 #include <assert.h>
     83 #include <errno.h>
     84 #include <stdbool.h>
     85 #include <stdio.h>
     86 #include <stdlib.h>
     87 #include <string.h>
     88 
     89 #define	KDASSERT(x)		assert(x)
     90 #define	KASSERT(x)		assert(x)
     91 #define	wapbl_alloc(s)		malloc(s)
     92 #define	wapbl_free(a, s)	free(a)
     93 #define	wapbl_calloc(n, s)	calloc((n), (s))
     94 
     95 #endif /* !_KERNEL */
     96 
     97 /*
     98  * INTERNAL DATA STRUCTURES
     99  */
    100 
    101 /*
    102  * This structure holds per-mount log information.
    103  *
    104  * Legend:	a = atomic access only
    105  *		r = read-only after init
    106  *		l = rwlock held
    107  *		m = mutex held
    108  *		lm = rwlock held writing or mutex held
    109  *		u = unlocked access ok
    110  *		b = bufcache_lock held
    111  */
    112 LIST_HEAD(wapbl_ino_head, wapbl_ino);
    113 struct wapbl {
    114 	struct vnode *wl_logvp;	/* r:	log here */
    115 	struct vnode *wl_devvp;	/* r:	log on this device */
    116 	struct mount *wl_mount;	/* r:	mountpoint wl is associated with */
    117 	daddr_t wl_logpbn;	/* r:	Physical block number of start of log */
    118 	int wl_log_dev_bshift;	/* r:	logarithm of device block size of log
    119 					device */
    120 	int wl_fs_dev_bshift;	/* r:	logarithm of device block size of
    121 					filesystem device */
    122 
    123 	unsigned wl_lock_count;	/* m:	Count of transactions in progress */
    124 
    125 	size_t wl_circ_size;	/* r:	Number of bytes in buffer of log */
    126 	size_t wl_circ_off;	/* r:	Number of bytes reserved at start */
    127 
    128 	size_t wl_bufcount_max;	/* r:	Number of buffers reserved for log */
    129 	size_t wl_bufbytes_max;	/* r:	Number of buf bytes reserved for log */
    130 
    131 	off_t wl_head;		/* l:	Byte offset of log head */
    132 	off_t wl_tail;		/* l:	Byte offset of log tail */
    133 	/*
    134 	 * WAPBL log layout, stored on wl_devvp at wl_logpbn:
    135 	 *
    136 	 *  ___________________ wl_circ_size __________________
    137 	 * /                                                   \
    138 	 * +---------+---------+-------+--------------+--------+
    139 	 * [ commit0 | commit1 | CCWCW | EEEEEEEEEEEE | CCCWCW ]
    140 	 * +---------+---------+-------+--------------+--------+
    141 	 *       wl_circ_off --^       ^-- wl_head    ^-- wl_tail
    142 	 *
    143 	 * commit0 and commit1 are commit headers.  A commit header has
    144 	 * a generation number, indicating which of the two headers is
    145 	 * more recent, and an assignment of head and tail pointers.
    146 	 * The rest is a circular queue of log records, starting at
    147 	 * the byte offset wl_circ_off.
    148 	 *
    149 	 * E marks empty space for records.
    150 	 * W marks records for block writes issued but waiting.
    151 	 * C marks completed records.
    152 	 *
    153 	 * wapbl_flush writes new records to empty `E' spaces after
    154 	 * wl_head from the current transaction in memory.
    155 	 *
    156 	 * wapbl_truncate advances wl_tail past any completed `C'
    157 	 * records, freeing them up for use.
    158 	 *
    159 	 * head == tail == 0 means log is empty.
    160 	 * head == tail != 0 means log is full.
    161 	 *
    162 	 * See assertions in wapbl_advance() for other boundary
    163 	 * conditions.
    164 	 *
    165 	 * Only wapbl_flush moves the head, except when wapbl_truncate
    166 	 * sets it to 0 to indicate that the log is empty.
    167 	 *
    168 	 * Only wapbl_truncate moves the tail, except when wapbl_flush
    169 	 * sets it to wl_circ_off to indicate that the log is full.
    170 	 */
    171 
    172 	struct wapbl_wc_header *wl_wc_header;	/* l	*/
    173 	void *wl_wc_scratch;	/* l:	scratch space (XXX: por que?!?) */
    174 
    175 	kmutex_t wl_mtx;	/* u:	short-term lock */
    176 	krwlock_t wl_rwlock;	/* u:	File system transaction lock */
    177 
    178 	/*
    179 	 * Must be held while accessing
    180 	 * wl_count or wl_bufs or head or tail
    181 	 */
    182 
    183 #if _KERNEL
    184 	/*
    185 	 * Callback called from within the flush routine to flush any extra
    186 	 * bits.  Note that flush may be skipped without calling this if
    187 	 * there are no outstanding buffers in the transaction.
    188 	 */
    189 	wapbl_flush_fn_t wl_flush;	/* r	*/
    190 	wapbl_flush_fn_t wl_flush_abort;/* r	*/
    191 
    192 	/* Event counters */
    193 	char wl_ev_group[EVCNT_STRING_MAX];	/* r	*/
    194 	struct evcnt wl_ev_commit;		/* l	*/
    195 	struct evcnt wl_ev_journalwrite;	/* l	*/
    196 	struct evcnt wl_ev_jbufs_bio_nowait;	/* l	*/
    197 	struct evcnt wl_ev_metawrite;		/* lm	*/
    198 	struct evcnt wl_ev_cacheflush;		/* l	*/
    199 #endif
    200 
    201 	size_t wl_bufbytes;	/* m:	Byte count of pages in wl_bufs */
    202 	size_t wl_bufcount;	/* m:	Count of buffers in wl_bufs */
    203 	size_t wl_bcount;	/* m:	Total bcount of wl_bufs */
    204 
    205 	TAILQ_HEAD(, buf) wl_bufs; /* m: Buffers in current transaction */
    206 
    207 	kcondvar_t wl_reclaimable_cv;	/* m (obviously) */
    208 	size_t wl_reclaimable_bytes; /* m:	Amount of space available for
    209 						reclamation by truncate */
    210 	int wl_error_count;	/* m:	# of wl_entries with errors */
    211 	size_t wl_reserved_bytes; /* never truncate log smaller than this */
    212 
    213 #ifdef WAPBL_DEBUG_BUFBYTES
    214 	size_t wl_unsynced_bufbytes; /* Byte count of unsynced buffers */
    215 #endif
    216 
    217 #if _KERNEL
    218 	int wl_brperjblock;	/* r Block records per journal block */
    219 #endif
    220 
    221 	TAILQ_HEAD(, wapbl_dealloc) wl_dealloclist;	/* lm:	list head */
    222 	int wl_dealloccnt;				/* lm:	total count */
    223 	int wl_dealloclim;				/* r:	max count */
    224 
    225 	/* hashtable of inode numbers for allocated but unlinked inodes */
    226 	/* synch ??? */
    227 	struct wapbl_ino_head *wl_inohash;
    228 	u_long wl_inohashmask;
    229 	int wl_inohashcnt;
    230 
    231 	SIMPLEQ_HEAD(, wapbl_entry) wl_entries; /* m: On disk transaction
    232 						   accounting */
    233 
    234 	/* buffers for wapbl_buffered_write() */
    235 	TAILQ_HEAD(, buf) wl_iobufs;		/* l: Free or filling bufs */
    236 	TAILQ_HEAD(, buf) wl_iobufs_busy;	/* l: In-transit bufs */
    237 
    238 	int wl_dkcache;		/* r:	disk cache flags */
    239 #define WAPBL_USE_FUA(wl)	\
    240 		(wapbl_allow_dpofua && ISSET((wl)->wl_dkcache, DKCACHE_FUA))
    241 #define WAPBL_JFLAGS(wl)	\
    242 		(WAPBL_USE_FUA(wl) ? (wl)->wl_jwrite_flags : 0)
    243 #define WAPBL_JDATA_FLAGS(wl)	\
    244 		(WAPBL_JFLAGS(wl) & B_MEDIA_DPO)	/* only DPO */
    245 	int wl_jwrite_flags;	/* r:	journal write flags */
    246 };
    247 
    248 #ifdef WAPBL_DEBUG_PRINT
    249 int wapbl_debug_print = WAPBL_DEBUG_PRINT;
    250 #endif
    251 
    252 /****************************************************************/
    253 #ifdef _KERNEL
    254 
    255 #ifdef WAPBL_DEBUG
    256 struct wapbl *wapbl_debug_wl;
    257 #endif
    258 
    259 static int wapbl_write_commit(struct wapbl *, off_t, off_t);
    260 static int wapbl_write_blocks(struct wapbl *, off_t *);
    261 static int wapbl_write_revocations(struct wapbl *, off_t *);
    262 static int wapbl_write_inodes(struct wapbl *, off_t *);
    263 #endif /* _KERNEL */
    264 
    265 static int wapbl_replay_process(struct wapbl_replay *, off_t, off_t);
    266 
    267 static inline size_t wapbl_space_used(size_t, off_t, off_t);
    268 
    269 #ifdef _KERNEL
    270 
    271 static struct pool wapbl_entry_pool;
    272 static struct pool wapbl_dealloc_pool;
    273 
    274 #define	WAPBL_INODETRK_SIZE 83
    275 static int wapbl_ino_pool_refcount;
    276 static struct pool wapbl_ino_pool;
    277 struct wapbl_ino {
    278 	LIST_ENTRY(wapbl_ino) wi_hash;
    279 	ino_t wi_ino;
    280 	mode_t wi_mode;
    281 };
    282 
    283 static void wapbl_inodetrk_init(struct wapbl *wl, u_int size);
    284 static void wapbl_inodetrk_free(struct wapbl *wl);
    285 static struct wapbl_ino *wapbl_inodetrk_get(struct wapbl *wl, ino_t ino);
    286 
    287 static size_t wapbl_transaction_len(struct wapbl *wl);
    288 static inline size_t wapbl_transaction_inodes_len(struct wapbl *wl);
    289 
    290 static void wapbl_deallocation_free(struct wapbl *, struct wapbl_dealloc *,
    291     bool);
    292 
    293 static void wapbl_evcnt_init(struct wapbl *);
    294 static void wapbl_evcnt_free(struct wapbl *);
    295 
    296 static void wapbl_dkcache_init(struct wapbl *);
    297 
    298 #if 0
    299 int wapbl_replay_verify(struct wapbl_replay *, struct vnode *);
    300 #endif
    301 
    302 static int wapbl_replay_isopen1(struct wapbl_replay *);
    303 
    304 const struct wapbl_ops wapbl_ops = {
    305 	.wo_wapbl_discard	= wapbl_discard,
    306 	.wo_wapbl_replay_isopen	= wapbl_replay_isopen1,
    307 	.wo_wapbl_replay_can_read = wapbl_replay_can_read,
    308 	.wo_wapbl_replay_read	= wapbl_replay_read,
    309 	.wo_wapbl_add_buf	= wapbl_add_buf,
    310 	.wo_wapbl_remove_buf	= wapbl_remove_buf,
    311 	.wo_wapbl_resize_buf	= wapbl_resize_buf,
    312 	.wo_wapbl_begin		= wapbl_begin,
    313 	.wo_wapbl_end		= wapbl_end,
    314 	.wo_wapbl_junlock_assert= wapbl_junlock_assert,
    315 	.wo_wapbl_jlock_assert	= wapbl_jlock_assert,
    316 
    317 	/* XXX: the following is only used to say "this is a wapbl buf" */
    318 	.wo_wapbl_biodone	= wapbl_biodone,
    319 };
    320 
    321 SYSCTL_SETUP(wapbl_sysctl_init, "wapbl sysctl")
    322 {
    323 	int rv;
    324 	const struct sysctlnode *rnode, *cnode;
    325 
    326 	rv = sysctl_createv(clog, 0, NULL, &rnode,
    327 	    CTLFLAG_PERMANENT,
    328 	    CTLTYPE_NODE, "wapbl",
    329 	    SYSCTL_DESCR("WAPBL journaling options"),
    330 	    NULL, 0, NULL, 0,
    331 	    CTL_VFS, CTL_CREATE, CTL_EOL);
    332 	if (rv)
    333 		return;
    334 
    335 	rv = sysctl_createv(clog, 0, &rnode, &cnode,
    336 	    CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
    337 	    CTLTYPE_INT, "flush_disk_cache",
    338 	    SYSCTL_DESCR("flush disk cache"),
    339 	    NULL, 0, &wapbl_flush_disk_cache, 0,
    340 	    CTL_CREATE, CTL_EOL);
    341 	if (rv)
    342 		return;
    343 
    344 	rv = sysctl_createv(clog, 0, &rnode, &cnode,
    345 	    CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
    346 	    CTLTYPE_INT, "verbose_commit",
    347 	    SYSCTL_DESCR("show time and size of wapbl log commits"),
    348 	    NULL, 0, &wapbl_verbose_commit, 0,
    349 	    CTL_CREATE, CTL_EOL);
    350 	if (rv)
    351 		return;
    352 
    353 	rv = sysctl_createv(clog, 0, &rnode, &cnode,
    354 	    CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
    355 	    CTLTYPE_INT, "allow_dpofua",
    356 	    SYSCTL_DESCR("allow use of FUA/DPO instead of cache flush"
    357 		" if available"),
    358 	    NULL, 0, &wapbl_allow_dpofua, 0,
    359 	    CTL_CREATE, CTL_EOL);
    360 	if (rv)
    361 		return;
    362 
    363 	rv = sysctl_createv(clog, 0, &rnode, &cnode,
    364 	    CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
    365 	    CTLTYPE_INT, "journal_iobufs",
    366 	    SYSCTL_DESCR("count of bufs used for journal I/O"
    367 		" (max async count)"),
    368 	    NULL, 0, &wapbl_journal_iobufs, 0,
    369 	    CTL_CREATE, CTL_EOL);
    370 	if (rv)
    371 		return;
    372 
    373 	return;
    374 }
    375 
    376 static void
    377 wapbl_init(void)
    378 {
    379 
    380 	pool_init(&wapbl_entry_pool, sizeof(struct wapbl_entry), 0, 0, 0,
    381 	    "wapblentrypl", &pool_allocator_kmem, IPL_VM);
    382 	pool_init(&wapbl_dealloc_pool, sizeof(struct wapbl_dealloc), 0, 0, 0,
    383 	    "wapbldealloc", &pool_allocator_nointr, IPL_NONE);
    384 }
    385 
    386 static int
    387 wapbl_fini(void)
    388 {
    389 
    390 	pool_destroy(&wapbl_dealloc_pool);
    391 	pool_destroy(&wapbl_entry_pool);
    392 
    393 	return 0;
    394 }
    395 
    396 static void
    397 wapbl_evcnt_init(struct wapbl *wl)
    398 {
    399 
    400 	snprintf(wl->wl_ev_group, sizeof(wl->wl_ev_group),
    401 	    "wapbl fsid 0x%x/0x%x",
    402 	    wl->wl_mount->mnt_stat.f_fsidx.__fsid_val[0],
    403 	    wl->wl_mount->mnt_stat.f_fsidx.__fsid_val[1]);
    404 
    405 	evcnt_attach_dynamic(&wl->wl_ev_commit, EVCNT_TYPE_MISC,
    406 	    NULL, wl->wl_ev_group, "commit");
    407 	evcnt_attach_dynamic(&wl->wl_ev_journalwrite, EVCNT_TYPE_MISC,
    408 	    NULL, wl->wl_ev_group, "journal write total");
    409 	evcnt_attach_dynamic(&wl->wl_ev_jbufs_bio_nowait, EVCNT_TYPE_MISC,
    410 	    NULL, wl->wl_ev_group, "journal write finished async");
    411 	evcnt_attach_dynamic(&wl->wl_ev_metawrite, EVCNT_TYPE_MISC,
    412 	    NULL, wl->wl_ev_group, "metadata async write");
    413 	evcnt_attach_dynamic(&wl->wl_ev_cacheflush, EVCNT_TYPE_MISC,
    414 	    NULL, wl->wl_ev_group, "cache flush");
    415 }
    416 
    417 static void
    418 wapbl_evcnt_free(struct wapbl *wl)
    419 {
    420 
    421 	evcnt_detach(&wl->wl_ev_commit);
    422 	evcnt_detach(&wl->wl_ev_journalwrite);
    423 	evcnt_detach(&wl->wl_ev_jbufs_bio_nowait);
    424 	evcnt_detach(&wl->wl_ev_metawrite);
    425 	evcnt_detach(&wl->wl_ev_cacheflush);
    426 }
    427 
    428 static void
    429 wapbl_dkcache_init(struct wapbl *wl)
    430 {
    431 	int error;
    432 
    433 	/* Get disk cache flags */
    434 	error = VOP_IOCTL(wl->wl_devvp, DIOCGCACHE, &wl->wl_dkcache,
    435 	    FWRITE, FSCRED);
    436 	if (error) {
    437 		/* behave as if there was a write cache */
    438 		wl->wl_dkcache = DKCACHE_WRITE;
    439 	}
    440 
    441 	/* Use FUA instead of cache flush if available */
    442 	if (ISSET(wl->wl_dkcache, DKCACHE_FUA))
    443 		wl->wl_jwrite_flags |= B_MEDIA_FUA;
    444 
    445 	/* Use DPO for journal writes if available */
    446 	if (ISSET(wl->wl_dkcache, DKCACHE_DPO))
    447 		wl->wl_jwrite_flags |= B_MEDIA_DPO;
    448 }
    449 
    450 static int
    451 wapbl_start_flush_inodes(struct wapbl *wl, struct wapbl_replay *wr)
    452 {
    453 	int error, i;
    454 
    455 	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
    456 	    ("wapbl_start: reusing log with %d inodes\n", wr->wr_inodescnt));
    457 
    458 	/*
    459 	 * Its only valid to reuse the replay log if its
    460 	 * the same as the new log we just opened.
    461 	 */
    462 	KDASSERT(!wapbl_replay_isopen(wr));
    463 	KASSERT(wl->wl_devvp->v_type == VBLK);
    464 	KASSERT(wr->wr_devvp->v_type == VBLK);
    465 	KASSERT(wl->wl_devvp->v_rdev == wr->wr_devvp->v_rdev);
    466 	KASSERT(wl->wl_logpbn == wr->wr_logpbn);
    467 	KASSERT(wl->wl_circ_size == wr->wr_circ_size);
    468 	KASSERT(wl->wl_circ_off == wr->wr_circ_off);
    469 	KASSERT(wl->wl_log_dev_bshift == wr->wr_log_dev_bshift);
    470 	KASSERT(wl->wl_fs_dev_bshift == wr->wr_fs_dev_bshift);
    471 
    472 	wl->wl_wc_header->wc_generation = wr->wr_generation + 1;
    473 
    474 	for (i = 0; i < wr->wr_inodescnt; i++)
    475 		wapbl_register_inode(wl, wr->wr_inodes[i].wr_inumber,
    476 		    wr->wr_inodes[i].wr_imode);
    477 
    478 	/* Make sure new transaction won't overwrite old inodes list */
    479 	KDASSERT(wapbl_transaction_len(wl) <=
    480 	    wapbl_space_free(wl->wl_circ_size, wr->wr_inodeshead,
    481 		wr->wr_inodestail));
    482 
    483 	wl->wl_head = wl->wl_tail = wr->wr_inodeshead;
    484 	wl->wl_reclaimable_bytes = wl->wl_reserved_bytes =
    485 	    wapbl_transaction_len(wl);
    486 
    487 	error = wapbl_write_inodes(wl, &wl->wl_head);
    488 	if (error)
    489 		return error;
    490 
    491 	KASSERT(wl->wl_head != wl->wl_tail);
    492 	KASSERT(wl->wl_head != 0);
    493 
    494 	return 0;
    495 }
    496 
    497 int
    498 wapbl_start(struct wapbl ** wlp, struct mount *mp, struct vnode *vp,
    499     daddr_t off, size_t count, size_t blksize, struct wapbl_replay *wr,
    500     wapbl_flush_fn_t flushfn, wapbl_flush_fn_t flushabortfn)
    501 {
    502 	struct wapbl *wl;
    503 	struct vnode *devvp;
    504 	daddr_t logpbn;
    505 	int error;
    506 	int log_dev_bshift = ilog2(blksize);
    507 	int fs_dev_bshift = log_dev_bshift;
    508 	int run;
    509 
    510 	WAPBL_PRINTF(WAPBL_PRINT_OPEN,
    511 	    ("wapbl_start: vp=%p off=%"PRId64" count=%zu blksize=%zu\n",
    512 		vp, off, count, blksize));
    513 
    514 	if (log_dev_bshift > fs_dev_bshift) {
    515 		WAPBL_PRINTF(WAPBL_PRINT_OPEN,
    516 		    ("wapbl: log device's block size cannot be larger "
    517 			"than filesystem's\n"));
    518 		/*
    519 		 * Not currently implemented, although it could be if
    520 		 * needed someday.
    521 		 */
    522 		return ENOSYS;
    523 	}
    524 
    525 	if (off < 0)
    526 		return EINVAL;
    527 
    528 	if (blksize < DEV_BSIZE)
    529 		return EINVAL;
    530 	if (blksize % DEV_BSIZE)
    531 		return EINVAL;
    532 
    533 	/* XXXTODO: verify that the full load is writable */
    534 
    535 	/*
    536 	 * XXX check for minimum log size
    537 	 * minimum is governed by minimum amount of space
    538 	 * to complete a transaction. (probably truncate)
    539 	 */
    540 	/* XXX for now pick something minimal */
    541 	if ((count * blksize) < MAXPHYS) {
    542 		return ENOSPC;
    543 	}
    544 
    545 	if ((error = VOP_BMAP(vp, off, &devvp, &logpbn, &run)) != 0) {
    546 		return error;
    547 	}
    548 
    549 	wl = wapbl_calloc(1, sizeof(*wl));
    550 	rw_init(&wl->wl_rwlock);
    551 	mutex_init(&wl->wl_mtx, MUTEX_DEFAULT, IPL_NONE);
    552 	cv_init(&wl->wl_reclaimable_cv, "wapblrec");
    553 	TAILQ_INIT(&wl->wl_bufs);
    554 	SIMPLEQ_INIT(&wl->wl_entries);
    555 
    556 	wl->wl_logvp = vp;
    557 	wl->wl_devvp = devvp;
    558 	wl->wl_mount = mp;
    559 	wl->wl_logpbn = logpbn;
    560 	wl->wl_log_dev_bshift = log_dev_bshift;
    561 	wl->wl_fs_dev_bshift = fs_dev_bshift;
    562 
    563 	wl->wl_flush = flushfn;
    564 	wl->wl_flush_abort = flushabortfn;
    565 
    566 	/* Reserve two log device blocks for the commit headers */
    567 	wl->wl_circ_off = 2<<wl->wl_log_dev_bshift;
    568 	wl->wl_circ_size = ((count * blksize) - wl->wl_circ_off);
    569 	/* truncate the log usage to a multiple of log_dev_bshift */
    570 	wl->wl_circ_size >>= wl->wl_log_dev_bshift;
    571 	wl->wl_circ_size <<= wl->wl_log_dev_bshift;
    572 
    573 	/*
    574 	 * wl_bufbytes_max limits the size of the in memory transaction space.
    575 	 * - Since buffers are allocated and accounted for in units of
    576 	 *   PAGE_SIZE it is required to be a multiple of PAGE_SIZE
    577 	 *   (i.e. 1<<PAGE_SHIFT)
    578 	 * - Since the log device has to be written in units of
    579 	 *   1<<wl_log_dev_bshift it is required to be a multiple of
    580 	 *   1<<wl_log_dev_bshift.
    581 	 * - Since filesystem will provide data in units of 1<<wl_fs_dev_bshift,
    582 	 *   it is convenient to be a multiple of 1<<wl_fs_dev_bshift.
    583 	 * Therefore it must be multiple of the least common multiple of those
    584 	 * three quantities.  Fortunately, all of those quantities are
    585 	 * guaranteed to be a power of two, and the least common multiple of
    586 	 * a set of numbers which are all powers of two is simply the maximum
    587 	 * of those numbers.  Finally, the maximum logarithm of a power of two
    588 	 * is the same as the log of the maximum power of two.  So we can do
    589 	 * the following operations to size wl_bufbytes_max:
    590 	 */
    591 
    592 	/* XXX fix actual number of pages reserved per filesystem. */
    593 	wl->wl_bufbytes_max = MIN(wl->wl_circ_size, buf_memcalc() / 2);
    594 
    595 	/* Round wl_bufbytes_max to the largest power of two constraint */
    596 	wl->wl_bufbytes_max >>= PAGE_SHIFT;
    597 	wl->wl_bufbytes_max <<= PAGE_SHIFT;
    598 	wl->wl_bufbytes_max >>= wl->wl_log_dev_bshift;
    599 	wl->wl_bufbytes_max <<= wl->wl_log_dev_bshift;
    600 	wl->wl_bufbytes_max >>= wl->wl_fs_dev_bshift;
    601 	wl->wl_bufbytes_max <<= wl->wl_fs_dev_bshift;
    602 
    603 	/* XXX maybe use filesystem fragment size instead of 1024 */
    604 	/* XXX fix actual number of buffers reserved per filesystem. */
    605 	wl->wl_bufcount_max = (buf_nbuf() / 2) * 1024;
    606 
    607 	wl->wl_brperjblock = ((1<<wl->wl_log_dev_bshift)
    608 	    - offsetof(struct wapbl_wc_blocklist, wc_blocks)) /
    609 	    sizeof(((struct wapbl_wc_blocklist *)0)->wc_blocks[0]);
    610 	KASSERT(wl->wl_brperjblock > 0);
    611 
    612 	/* XXX tie this into resource estimation */
    613 	wl->wl_dealloclim = wl->wl_bufbytes_max / mp->mnt_stat.f_bsize / 2;
    614 	TAILQ_INIT(&wl->wl_dealloclist);
    615 
    616 	wapbl_inodetrk_init(wl, WAPBL_INODETRK_SIZE);
    617 
    618 	wapbl_evcnt_init(wl);
    619 
    620 	wapbl_dkcache_init(wl);
    621 
    622 	/* Initialize the commit header */
    623 	{
    624 		struct wapbl_wc_header *wc;
    625 		size_t len = 1 << wl->wl_log_dev_bshift;
    626 		wc = wapbl_calloc(1, len);
    627 		wc->wc_type = WAPBL_WC_HEADER;
    628 		wc->wc_len = len;
    629 		wc->wc_circ_off = wl->wl_circ_off;
    630 		wc->wc_circ_size = wl->wl_circ_size;
    631 		/* XXX wc->wc_fsid */
    632 		wc->wc_log_dev_bshift = wl->wl_log_dev_bshift;
    633 		wc->wc_fs_dev_bshift = wl->wl_fs_dev_bshift;
    634 		wl->wl_wc_header = wc;
    635 		wl->wl_wc_scratch = wapbl_alloc(len);
    636 	}
    637 
    638 	TAILQ_INIT(&wl->wl_iobufs);
    639 	TAILQ_INIT(&wl->wl_iobufs_busy);
    640 	for (int i = 0; i < wapbl_journal_iobufs; i++) {
    641 		struct buf *bp;
    642 
    643 		if ((bp = geteblk(MAXPHYS)) == NULL)
    644 			goto errout;
    645 
    646 		mutex_enter(&bufcache_lock);
    647 		mutex_enter(devvp->v_interlock);
    648 		bgetvp(devvp, bp);
    649 		mutex_exit(devvp->v_interlock);
    650 		mutex_exit(&bufcache_lock);
    651 
    652 		bp->b_dev = devvp->v_rdev;
    653 
    654 		TAILQ_INSERT_TAIL(&wl->wl_iobufs, bp, b_wapbllist);
    655 	}
    656 
    657 	/*
    658 	 * if there was an existing set of unlinked but
    659 	 * allocated inodes, preserve it in the new
    660 	 * log.
    661 	 */
    662 	if (wr && wr->wr_inodescnt) {
    663 		error = wapbl_start_flush_inodes(wl, wr);
    664 		if (error)
    665 			goto errout;
    666 	}
    667 
    668 	error = wapbl_write_commit(wl, wl->wl_head, wl->wl_tail);
    669 	if (error) {
    670 		goto errout;
    671 	}
    672 
    673 	*wlp = wl;
    674 #if defined(WAPBL_DEBUG)
    675 	wapbl_debug_wl = wl;
    676 #endif
    677 
    678 	return 0;
    679 errout:
    680 	wapbl_discard(wl);
    681 	wapbl_free(wl->wl_wc_scratch, wl->wl_wc_header->wc_len);
    682 	wapbl_free(wl->wl_wc_header, wl->wl_wc_header->wc_len);
    683 	while (!TAILQ_EMPTY(&wl->wl_iobufs)) {
    684 		struct buf *bp;
    685 
    686 		bp = TAILQ_FIRST(&wl->wl_iobufs);
    687 		TAILQ_REMOVE(&wl->wl_iobufs, bp, b_wapbllist);
    688 		brelse(bp, BC_INVAL);
    689 	}
    690 	wapbl_inodetrk_free(wl);
    691 	wapbl_free(wl, sizeof(*wl));
    692 
    693 	return error;
    694 }
    695 
    696 /*
    697  * Like wapbl_flush, only discards the transaction
    698  * completely
    699  */
    700 
    701 void
    702 wapbl_discard(struct wapbl *wl)
    703 {
    704 	struct wapbl_entry *we;
    705 	struct wapbl_dealloc *wd;
    706 	struct buf *bp;
    707 	int i;
    708 
    709 	/*
    710 	 * XXX we may consider using upgrade here
    711 	 * if we want to call flush from inside a transaction
    712 	 */
    713 	rw_enter(&wl->wl_rwlock, RW_WRITER);
    714 	wl->wl_flush(wl->wl_mount, TAILQ_FIRST(&wl->wl_dealloclist));
    715 
    716 #ifdef WAPBL_DEBUG_PRINT
    717 	{
    718 		pid_t pid = -1;
    719 		lwpid_t lid = -1;
    720 		if (curproc)
    721 			pid = curproc->p_pid;
    722 		if (curlwp)
    723 			lid = curlwp->l_lid;
    724 #ifdef WAPBL_DEBUG_BUFBYTES
    725 		WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
    726 		    ("wapbl_discard: thread %d.%d discarding "
    727 			"transaction\n"
    728 			"\tbufcount=%zu bufbytes=%zu bcount=%zu "
    729 			"deallocs=%d inodes=%d\n"
    730 			"\terrcnt = %u, reclaimable=%zu reserved=%zu "
    731 			"unsynced=%zu\n",
    732 			pid, lid,
    733 			wl->wl_bufcount, wl->wl_bufbytes, wl->wl_bcount,
    734 			wl->wl_dealloccnt, wl->wl_inohashcnt,
    735 			wl->wl_error_count, wl->wl_reclaimable_bytes,
    736 			wl->wl_reserved_bytes,
    737 			wl->wl_unsynced_bufbytes));
    738 		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
    739 			WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
    740 			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
    741 				"error = %d, unsynced = %zu\n",
    742 				we->we_bufcount, we->we_reclaimable_bytes,
    743 				we->we_error, we->we_unsynced_bufbytes));
    744 		}
    745 #else /* !WAPBL_DEBUG_BUFBYTES */
    746 		WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
    747 		    ("wapbl_discard: thread %d.%d discarding transaction\n"
    748 			"\tbufcount=%zu bufbytes=%zu bcount=%zu "
    749 			"deallocs=%d inodes=%d\n"
    750 			"\terrcnt = %u, reclaimable=%zu reserved=%zu\n",
    751 			pid, lid,
    752 			wl->wl_bufcount, wl->wl_bufbytes, wl->wl_bcount,
    753 			wl->wl_dealloccnt, wl->wl_inohashcnt,
    754 			wl->wl_error_count, wl->wl_reclaimable_bytes,
    755 			wl->wl_reserved_bytes));
    756 		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
    757 			WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
    758 			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
    759 				"error = %d\n",
    760 				we->we_bufcount, we->we_reclaimable_bytes,
    761 				we->we_error));
    762 		}
    763 #endif /* !WAPBL_DEBUG_BUFBYTES */
    764 	}
    765 #endif /* WAPBL_DEBUG_PRINT */
    766 
    767 	for (i = 0; i <= wl->wl_inohashmask; i++) {
    768 		struct wapbl_ino_head *wih;
    769 		struct wapbl_ino *wi;
    770 
    771 		wih = &wl->wl_inohash[i];
    772 		while ((wi = LIST_FIRST(wih)) != NULL) {
    773 			LIST_REMOVE(wi, wi_hash);
    774 			pool_put(&wapbl_ino_pool, wi);
    775 			KASSERT(wl->wl_inohashcnt > 0);
    776 			wl->wl_inohashcnt--;
    777 		}
    778 	}
    779 
    780 	/*
    781 	 * clean buffer list
    782 	 */
    783 	mutex_enter(&bufcache_lock);
    784 	mutex_enter(&wl->wl_mtx);
    785 	while ((bp = TAILQ_FIRST(&wl->wl_bufs)) != NULL) {
    786 		if (bbusy(bp, 0, 0, &wl->wl_mtx) == 0) {
    787 			KASSERT(bp->b_flags & B_LOCKED);
    788 			KASSERT(bp->b_oflags & BO_DELWRI);
    789 			/*
    790 			 * Buffer is already on BQ_LOCKED queue.
    791 			 * The buffer will be unlocked and
    792 			 * removed from the transaction in brelsel()
    793 			 */
    794 			mutex_exit(&wl->wl_mtx);
    795 			bremfree(bp);
    796 			brelsel(bp, BC_INVAL);
    797 			mutex_enter(&wl->wl_mtx);
    798 		}
    799 	}
    800 
    801 	/*
    802 	 * Remove references to this wl from wl_entries, free any which
    803 	 * no longer have buffers, others will be freed in wapbl_biodone()
    804 	 * when they no longer have any buffers.
    805 	 */
    806 	while ((we = SIMPLEQ_FIRST(&wl->wl_entries)) != NULL) {
    807 		SIMPLEQ_REMOVE_HEAD(&wl->wl_entries, we_entries);
    808 		/* XXX should we be accumulating wl_error_count
    809 		 * and increasing reclaimable bytes ? */
    810 		we->we_wapbl = NULL;
    811 		if (we->we_bufcount == 0) {
    812 #ifdef WAPBL_DEBUG_BUFBYTES
    813 			KASSERT(we->we_unsynced_bufbytes == 0);
    814 #endif
    815 			pool_put(&wapbl_entry_pool, we);
    816 		}
    817 	}
    818 
    819 	mutex_exit(&wl->wl_mtx);
    820 	mutex_exit(&bufcache_lock);
    821 
    822 	/* Discard list of deallocs */
    823 	while ((wd = TAILQ_FIRST(&wl->wl_dealloclist)) != NULL)
    824 		wapbl_deallocation_free(wl, wd, true);
    825 
    826 	/* XXX should we clear wl_reserved_bytes? */
    827 
    828 	KASSERT(wl->wl_bufbytes == 0);
    829 	KASSERT(wl->wl_bcount == 0);
    830 	KASSERT(wl->wl_bufcount == 0);
    831 	KASSERT(TAILQ_EMPTY(&wl->wl_bufs));
    832 	KASSERT(SIMPLEQ_EMPTY(&wl->wl_entries));
    833 	KASSERT(wl->wl_inohashcnt == 0);
    834 	KASSERT(TAILQ_EMPTY(&wl->wl_dealloclist));
    835 	KASSERT(wl->wl_dealloccnt == 0);
    836 
    837 	rw_exit(&wl->wl_rwlock);
    838 }
    839 
    840 int
    841 wapbl_stop(struct wapbl *wl, int force)
    842 {
    843 	int error;
    844 
    845 	WAPBL_PRINTF(WAPBL_PRINT_OPEN, ("wapbl_stop called\n"));
    846 	error = wapbl_flush(wl, 1);
    847 	if (error) {
    848 		if (force)
    849 			wapbl_discard(wl);
    850 		else
    851 			return error;
    852 	}
    853 
    854 	/* Unlinked inodes persist after a flush */
    855 	if (wl->wl_inohashcnt) {
    856 		if (force) {
    857 			wapbl_discard(wl);
    858 		} else {
    859 			return EBUSY;
    860 		}
    861 	}
    862 
    863 	KASSERT(wl->wl_bufbytes == 0);
    864 	KASSERT(wl->wl_bcount == 0);
    865 	KASSERT(wl->wl_bufcount == 0);
    866 	KASSERT(TAILQ_EMPTY(&wl->wl_bufs));
    867 	KASSERT(wl->wl_dealloccnt == 0);
    868 	KASSERT(SIMPLEQ_EMPTY(&wl->wl_entries));
    869 	KASSERT(wl->wl_inohashcnt == 0);
    870 	KASSERT(TAILQ_EMPTY(&wl->wl_dealloclist));
    871 	KASSERT(wl->wl_dealloccnt == 0);
    872 	KASSERT(TAILQ_EMPTY(&wl->wl_iobufs_busy));
    873 
    874 	wapbl_free(wl->wl_wc_scratch, wl->wl_wc_header->wc_len);
    875 	wapbl_free(wl->wl_wc_header, wl->wl_wc_header->wc_len);
    876 	while (!TAILQ_EMPTY(&wl->wl_iobufs)) {
    877 		struct buf *bp;
    878 
    879 		bp = TAILQ_FIRST(&wl->wl_iobufs);
    880 		TAILQ_REMOVE(&wl->wl_iobufs, bp, b_wapbllist);
    881 		brelse(bp, BC_INVAL);
    882 	}
    883 	wapbl_inodetrk_free(wl);
    884 
    885 	wapbl_evcnt_free(wl);
    886 
    887 	cv_destroy(&wl->wl_reclaimable_cv);
    888 	mutex_destroy(&wl->wl_mtx);
    889 	rw_destroy(&wl->wl_rwlock);
    890 	wapbl_free(wl, sizeof(*wl));
    891 
    892 	return 0;
    893 }
    894 
    895 /****************************************************************/
    896 /*
    897  * Unbuffered disk I/O
    898  */
    899 
    900 static void
    901 wapbl_doio_accounting(struct vnode *devvp, int flags)
    902 {
    903 	struct pstats *pstats = curlwp->l_proc->p_stats;
    904 
    905 	if ((flags & (B_WRITE | B_READ)) == B_WRITE) {
    906 		mutex_enter(devvp->v_interlock);
    907 		devvp->v_numoutput++;
    908 		mutex_exit(devvp->v_interlock);
    909 		pstats->p_ru.ru_oublock++;
    910 	} else {
    911 		pstats->p_ru.ru_inblock++;
    912 	}
    913 
    914 }
    915 
    916 static int
    917 wapbl_doio(void *data, size_t len, struct vnode *devvp, daddr_t pbn, int flags)
    918 {
    919 	struct buf *bp;
    920 	int error;
    921 
    922 	KASSERT(devvp->v_type == VBLK);
    923 
    924 	wapbl_doio_accounting(devvp, flags);
    925 
    926 	bp = getiobuf(devvp, true);
    927 	bp->b_flags = flags;
    928 	bp->b_cflags |= BC_BUSY;	/* mandatory, asserted by biowait() */
    929 	bp->b_dev = devvp->v_rdev;
    930 	bp->b_data = data;
    931 	bp->b_bufsize = bp->b_resid = bp->b_bcount = len;
    932 	bp->b_blkno = pbn;
    933 	BIO_SETPRIO(bp, BPRIO_TIMECRITICAL);
    934 
    935 	WAPBL_PRINTF(WAPBL_PRINT_IO,
    936 	    ("wapbl_doio: %s %d bytes at block %"PRId64" on dev 0x%"PRIx64"\n",
    937 		BUF_ISWRITE(bp) ? "write" : "read", bp->b_bcount,
    938 		bp->b_blkno, bp->b_dev));
    939 
    940 	VOP_STRATEGY(devvp, bp);
    941 
    942 	error = biowait(bp);
    943 	putiobuf(bp);
    944 
    945 	if (error) {
    946 		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
    947 		    ("wapbl_doio: %s %zu bytes at block %" PRId64
    948 			" on dev 0x%"PRIx64" failed with error %d\n",
    949 			(((flags & (B_WRITE | B_READ)) == B_WRITE) ?
    950 			    "write" : "read"),
    951 			len, pbn, devvp->v_rdev, error));
    952 	}
    953 
    954 	return error;
    955 }
    956 
    957 /*
    958  * wapbl_write(data, len, devvp, pbn)
    959  *
    960  *	Synchronously write len bytes from data to physical block pbn
    961  *	on devvp.
    962  */
    963 int
    964 wapbl_write(void *data, size_t len, struct vnode *devvp, daddr_t pbn)
    965 {
    966 
    967 	return wapbl_doio(data, len, devvp, pbn, B_WRITE);
    968 }
    969 
    970 /*
    971  * wapbl_read(data, len, devvp, pbn)
    972  *
    973  *	Synchronously read len bytes into data from physical block pbn
    974  *	on devvp.
    975  */
    976 int
    977 wapbl_read(void *data, size_t len, struct vnode *devvp, daddr_t pbn)
    978 {
    979 
    980 	return wapbl_doio(data, len, devvp, pbn, B_READ);
    981 }
    982 
    983 /****************************************************************/
    984 /*
    985  * Buffered disk writes -- try to coalesce writes and emit
    986  * MAXPHYS-aligned blocks.
    987  */
    988 
    989 /*
    990  * wapbl_buffered_write_async(wl, bp)
    991  *
    992  *	Send buffer for asynchronous write.
    993  */
    994 static void
    995 wapbl_buffered_write_async(struct wapbl *wl, struct buf *bp)
    996 {
    997 
    998 	wapbl_doio_accounting(wl->wl_devvp, bp->b_flags);
    999 
   1000 	KASSERT(TAILQ_FIRST(&wl->wl_iobufs) == bp);
   1001 	TAILQ_REMOVE(&wl->wl_iobufs, bp, b_wapbllist);
   1002 
   1003 	bp->b_flags |= B_WRITE;
   1004 	bp->b_cflags |= BC_BUSY;	/* mandatory, asserted by biowait() */
   1005 	bp->b_oflags = 0;
   1006 	bp->b_bcount = bp->b_resid;
   1007 	BIO_SETPRIO(bp, BPRIO_TIMECRITICAL);
   1008 
   1009 	VOP_STRATEGY(wl->wl_devvp, bp);
   1010 
   1011 	wl->wl_ev_journalwrite.ev_count++;
   1012 
   1013 	TAILQ_INSERT_TAIL(&wl->wl_iobufs_busy, bp, b_wapbllist);
   1014 }
   1015 
   1016 /*
   1017  * wapbl_buffered_flush(wl)
   1018  *
   1019  *	Flush any buffered writes from wapbl_buffered_write.
   1020  */
   1021 static int
   1022 wapbl_buffered_flush(struct wapbl *wl, bool full)
   1023 {
   1024 	int error = 0;
   1025 	struct buf *bp, *bnext;
   1026 	bool only_done = true, found = false;
   1027 
   1028 	/* if there is outstanding buffered write, send it now */
   1029 	if ((bp = TAILQ_FIRST(&wl->wl_iobufs)) && bp->b_resid > 0)
   1030 		wapbl_buffered_write_async(wl, bp);
   1031 
   1032 	/* wait for I/O to complete */
   1033 again:
   1034 	TAILQ_FOREACH_SAFE(bp, &wl->wl_iobufs_busy, b_wapbllist, bnext) {
   1035 		if (!full && only_done) {
   1036 			/* skip unfinished */
   1037 			if (!ISSET(bp->b_oflags, BO_DONE))
   1038 				continue;
   1039 		}
   1040 
   1041 		if (ISSET(bp->b_oflags, BO_DONE))
   1042 			wl->wl_ev_jbufs_bio_nowait.ev_count++;
   1043 
   1044 		TAILQ_REMOVE(&wl->wl_iobufs_busy, bp, b_wapbllist);
   1045 		error = biowait(bp);
   1046 
   1047 		/* reset for reuse */
   1048 		bp->b_blkno = bp->b_resid = bp->b_flags = 0;
   1049 		TAILQ_INSERT_TAIL(&wl->wl_iobufs, bp, b_wapbllist);
   1050 		found = true;
   1051 
   1052 		if (!full)
   1053 			break;
   1054 	}
   1055 
   1056 	if (!found && only_done && !TAILQ_EMPTY(&wl->wl_iobufs_busy)) {
   1057 		only_done = false;
   1058 		goto again;
   1059 	}
   1060 
   1061 	return error;
   1062 }
   1063 
   1064 /*
   1065  * wapbl_buffered_write(data, len, wl, pbn)
   1066  *
   1067  *	Write len bytes from data to physical block pbn on
   1068  *	wl->wl_devvp.  The write may not complete until
   1069  *	wapbl_buffered_flush.
   1070  */
   1071 static int
   1072 wapbl_buffered_write(void *data, size_t len, struct wapbl *wl, daddr_t pbn,
   1073     int bflags)
   1074 {
   1075 	size_t resid;
   1076 	struct buf *bp;
   1077 
   1078 again:
   1079 	bp = TAILQ_FIRST(&wl->wl_iobufs);
   1080 
   1081 	if (bp == NULL) {
   1082 		/* No more buffers, wait for any previous I/O to finish. */
   1083 		wapbl_buffered_flush(wl, false);
   1084 
   1085 		bp = TAILQ_FIRST(&wl->wl_iobufs);
   1086 		KASSERT(bp != NULL);
   1087 	}
   1088 
   1089 	/*
   1090 	 * If not adjacent to buffered data flush first.  Disk block
   1091 	 * address is always valid for non-empty buffer.
   1092 	 */
   1093 	if ((bp->b_resid > 0 && pbn != bp->b_blkno + btodb(bp->b_resid))) {
   1094 		wapbl_buffered_write_async(wl, bp);
   1095 		goto again;
   1096 	}
   1097 
   1098 	/*
   1099 	 * If this write goes to an empty buffer we have to
   1100 	 * save the disk block address first.
   1101 	 */
   1102 	if (bp->b_blkno == 0) {
   1103 		bp->b_blkno = pbn;
   1104 		bp->b_flags |= bflags;
   1105 	}
   1106 
   1107 	/*
   1108 	 * Remaining space so this buffer ends on a buffer size boundary.
   1109 	 *
   1110 	 * Cannot become less or equal zero as the buffer would have been
   1111 	 * flushed on the last call then.
   1112 	 */
   1113 	resid = bp->b_bufsize - dbtob(bp->b_blkno % btodb(bp->b_bufsize)) -
   1114 	    bp->b_resid;
   1115 	KASSERT(resid > 0);
   1116 	KASSERT(dbtob(btodb(resid)) == resid);
   1117 
   1118 	if (len < resid)
   1119 		resid = len;
   1120 
   1121 	memcpy((uint8_t *)bp->b_data + bp->b_resid, data, resid);
   1122 	bp->b_resid += resid;
   1123 
   1124 	if (len >= resid) {
   1125 		/* Just filled the buf, or data did not fit */
   1126 		wapbl_buffered_write_async(wl, bp);
   1127 
   1128 		data = (uint8_t *)data + resid;
   1129 		len -= resid;
   1130 		pbn += btodb(resid);
   1131 
   1132 		if (len > 0)
   1133 			goto again;
   1134 	}
   1135 
   1136 	return 0;
   1137 }
   1138 
   1139 /*
   1140  * wapbl_circ_write(wl, data, len, offp)
   1141  *
   1142  *	Write len bytes from data to the circular queue of wl, starting
   1143  *	at linear byte offset *offp, and returning the new linear byte
   1144  *	offset in *offp.
   1145  *
   1146  *	If the starting linear byte offset precedes wl->wl_circ_off,
   1147  *	the write instead begins at wl->wl_circ_off.  XXX WTF?  This
   1148  *	should be a KASSERT, not a conditional.
   1149  *
   1150  *	The write is buffered in wl and must be flushed with
   1151  *	wapbl_buffered_flush before it will be submitted to the disk.
   1152  */
   1153 static int
   1154 wapbl_circ_write(struct wapbl *wl, void *data, size_t len, off_t *offp)
   1155 {
   1156 	size_t slen;
   1157 	off_t off = *offp;
   1158 	int error;
   1159 	daddr_t pbn;
   1160 
   1161 	KDASSERT(((len >> wl->wl_log_dev_bshift) << wl->wl_log_dev_bshift) ==
   1162 	    len);
   1163 
   1164 	if (off < wl->wl_circ_off)
   1165 		off = wl->wl_circ_off;
   1166 	slen = wl->wl_circ_off + wl->wl_circ_size - off;
   1167 	if (slen < len) {
   1168 		pbn = wl->wl_logpbn + (off >> wl->wl_log_dev_bshift);
   1169 #ifdef _KERNEL
   1170 		pbn = btodb(pbn << wl->wl_log_dev_bshift);
   1171 #endif
   1172 		error = wapbl_buffered_write(data, slen, wl, pbn,
   1173 		    WAPBL_JDATA_FLAGS(wl));
   1174 		if (error)
   1175 			return error;
   1176 		data = (uint8_t *)data + slen;
   1177 		len -= slen;
   1178 		off = wl->wl_circ_off;
   1179 	}
   1180 	pbn = wl->wl_logpbn + (off >> wl->wl_log_dev_bshift);
   1181 #ifdef _KERNEL
   1182 	pbn = btodb(pbn << wl->wl_log_dev_bshift);
   1183 #endif
   1184 	error = wapbl_buffered_write(data, len, wl, pbn,
   1185 	    WAPBL_JDATA_FLAGS(wl));
   1186 	if (error)
   1187 		return error;
   1188 	off += len;
   1189 	if (off >= wl->wl_circ_off + wl->wl_circ_size)
   1190 		off = wl->wl_circ_off;
   1191 	*offp = off;
   1192 	return 0;
   1193 }
   1194 
   1195 /****************************************************************/
   1196 /*
   1197  * WAPBL transactions: entering, adding/removing bufs, and exiting
   1198  */
   1199 
   1200 int
   1201 wapbl_begin(struct wapbl *wl, const char *file, int line)
   1202 {
   1203 	int doflush;
   1204 	unsigned lockcount;
   1205 
   1206 	KDASSERT(wl);
   1207 
   1208 	/*
   1209 	 * XXX this needs to be made much more sophisticated.
   1210 	 * perhaps each wapbl_begin could reserve a specified
   1211 	 * number of buffers and bytes.
   1212 	 */
   1213 	mutex_enter(&wl->wl_mtx);
   1214 	lockcount = wl->wl_lock_count;
   1215 	doflush = ((wl->wl_bufbytes + (lockcount * MAXPHYS)) >
   1216 		wl->wl_bufbytes_max / 2) ||
   1217 	    ((wl->wl_bufcount + (lockcount * 10)) >
   1218 		wl->wl_bufcount_max / 2) ||
   1219 	    (wapbl_transaction_len(wl) > wl->wl_circ_size / 2) ||
   1220 	    (wl->wl_dealloccnt >= (wl->wl_dealloclim / 2));
   1221 	mutex_exit(&wl->wl_mtx);
   1222 
   1223 	if (doflush) {
   1224 		WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
   1225 		    ("force flush lockcnt=%d bufbytes=%zu "
   1226 			"(max=%zu) bufcount=%zu (max=%zu) "
   1227 			"dealloccnt %d (lim=%d)\n",
   1228 			lockcount, wl->wl_bufbytes,
   1229 			wl->wl_bufbytes_max, wl->wl_bufcount,
   1230 			wl->wl_bufcount_max,
   1231 			wl->wl_dealloccnt, wl->wl_dealloclim));
   1232 	}
   1233 
   1234 	if (doflush) {
   1235 		int error = wapbl_flush(wl, 0);
   1236 		if (error)
   1237 			return error;
   1238 	}
   1239 
   1240 	rw_enter(&wl->wl_rwlock, RW_READER);
   1241 	mutex_enter(&wl->wl_mtx);
   1242 	wl->wl_lock_count++;
   1243 	mutex_exit(&wl->wl_mtx);
   1244 
   1245 #if defined(WAPBL_DEBUG_PRINT)
   1246 	WAPBL_PRINTF(WAPBL_PRINT_TRANSACTION,
   1247 	    ("wapbl_begin thread %d.%d with bufcount=%zu "
   1248 		"bufbytes=%zu bcount=%zu at %s:%d\n",
   1249 		curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
   1250 		wl->wl_bufbytes, wl->wl_bcount, file, line));
   1251 #endif
   1252 
   1253 	return 0;
   1254 }
   1255 
   1256 void
   1257 wapbl_end(struct wapbl *wl)
   1258 {
   1259 
   1260 #if defined(WAPBL_DEBUG_PRINT)
   1261 	WAPBL_PRINTF(WAPBL_PRINT_TRANSACTION,
   1262 	    ("wapbl_end thread %d.%d with bufcount=%zu "
   1263 		"bufbytes=%zu bcount=%zu\n",
   1264 		curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
   1265 		wl->wl_bufbytes, wl->wl_bcount));
   1266 #endif
   1267 
   1268 	/*
   1269 	 * XXX this could be handled more gracefully, perhaps place
   1270 	 * only a partial transaction in the log and allow the
   1271 	 * remaining to flush without the protection of the journal.
   1272 	 */
   1273 	KASSERTMSG((wapbl_transaction_len(wl) <=
   1274 		(wl->wl_circ_size - wl->wl_reserved_bytes)),
   1275 	    "wapbl_end: current transaction too big to flush");
   1276 
   1277 	mutex_enter(&wl->wl_mtx);
   1278 	KASSERT(wl->wl_lock_count > 0);
   1279 	wl->wl_lock_count--;
   1280 	mutex_exit(&wl->wl_mtx);
   1281 
   1282 	rw_exit(&wl->wl_rwlock);
   1283 }
   1284 
   1285 void
   1286 wapbl_add_buf(struct wapbl *wl, struct buf * bp)
   1287 {
   1288 
   1289 	KASSERT(bp->b_cflags & BC_BUSY);
   1290 	KASSERT(bp->b_vp);
   1291 
   1292 	wapbl_jlock_assert(wl);
   1293 
   1294 #if 0
   1295 	/*
   1296 	 * XXX this might be an issue for swapfiles.
   1297 	 * see uvm_swap.c:1702
   1298 	 *
   1299 	 * XXX2 why require it then?  leap of semantics?
   1300 	 */
   1301 	KASSERT((bp->b_cflags & BC_NOCACHE) == 0);
   1302 #endif
   1303 
   1304 	mutex_enter(&wl->wl_mtx);
   1305 	if (bp->b_flags & B_LOCKED) {
   1306 		TAILQ_REMOVE(&wl->wl_bufs, bp, b_wapbllist);
   1307 		WAPBL_PRINTF(WAPBL_PRINT_BUFFER2,
   1308 		    ("wapbl_add_buf thread %d.%d re-adding buf %p "
   1309 			"with %d bytes %d bcount\n",
   1310 			curproc->p_pid, curlwp->l_lid, bp,
   1311 			bp->b_bufsize, bp->b_bcount));
   1312 	} else {
   1313 		/* unlocked by dirty buffers shouldn't exist */
   1314 		KASSERT(!(bp->b_oflags & BO_DELWRI));
   1315 		wl->wl_bufbytes += bp->b_bufsize;
   1316 		wl->wl_bcount += bp->b_bcount;
   1317 		wl->wl_bufcount++;
   1318 		WAPBL_PRINTF(WAPBL_PRINT_BUFFER,
   1319 		    ("wapbl_add_buf thread %d.%d adding buf %p "
   1320 			"with %d bytes %d bcount\n",
   1321 			curproc->p_pid, curlwp->l_lid, bp,
   1322 			bp->b_bufsize, bp->b_bcount));
   1323 	}
   1324 	TAILQ_INSERT_TAIL(&wl->wl_bufs, bp, b_wapbllist);
   1325 	mutex_exit(&wl->wl_mtx);
   1326 
   1327 	bp->b_flags |= B_LOCKED;
   1328 }
   1329 
   1330 static void
   1331 wapbl_remove_buf_locked(struct wapbl * wl, struct buf *bp)
   1332 {
   1333 
   1334 	KASSERT(mutex_owned(&wl->wl_mtx));
   1335 	KASSERT(bp->b_cflags & BC_BUSY);
   1336 	wapbl_jlock_assert(wl);
   1337 
   1338 #if 0
   1339 	/*
   1340 	 * XXX this might be an issue for swapfiles.
   1341 	 * see uvm_swap.c:1725
   1342 	 *
   1343 	 * XXXdeux: see above
   1344 	 */
   1345 	KASSERT((bp->b_flags & BC_NOCACHE) == 0);
   1346 #endif
   1347 	KASSERT(bp->b_flags & B_LOCKED);
   1348 
   1349 	WAPBL_PRINTF(WAPBL_PRINT_BUFFER,
   1350 	    ("wapbl_remove_buf thread %d.%d removing buf %p with "
   1351 		"%d bytes %d bcount\n",
   1352 		curproc->p_pid, curlwp->l_lid, bp,
   1353 		bp->b_bufsize, bp->b_bcount));
   1354 
   1355 	KASSERT(wl->wl_bufbytes >= bp->b_bufsize);
   1356 	wl->wl_bufbytes -= bp->b_bufsize;
   1357 	KASSERT(wl->wl_bcount >= bp->b_bcount);
   1358 	wl->wl_bcount -= bp->b_bcount;
   1359 	KASSERT(wl->wl_bufcount > 0);
   1360 	wl->wl_bufcount--;
   1361 	KASSERT((wl->wl_bufcount == 0) == (wl->wl_bufbytes == 0));
   1362 	KASSERT((wl->wl_bufcount == 0) == (wl->wl_bcount == 0));
   1363 	TAILQ_REMOVE(&wl->wl_bufs, bp, b_wapbllist);
   1364 
   1365 	bp->b_flags &= ~B_LOCKED;
   1366 }
   1367 
   1368 /* called from brelsel() in vfs_bio among other places */
   1369 void
   1370 wapbl_remove_buf(struct wapbl * wl, struct buf *bp)
   1371 {
   1372 
   1373 	mutex_enter(&wl->wl_mtx);
   1374 	wapbl_remove_buf_locked(wl, bp);
   1375 	mutex_exit(&wl->wl_mtx);
   1376 }
   1377 
   1378 void
   1379 wapbl_resize_buf(struct wapbl *wl, struct buf *bp, long oldsz, long oldcnt)
   1380 {
   1381 
   1382 	KASSERT(bp->b_cflags & BC_BUSY);
   1383 
   1384 	/*
   1385 	 * XXX: why does this depend on B_LOCKED?  otherwise the buf
   1386 	 * is not for a transaction?  if so, why is this called in the
   1387 	 * first place?
   1388 	 */
   1389 	if (bp->b_flags & B_LOCKED) {
   1390 		mutex_enter(&wl->wl_mtx);
   1391 		wl->wl_bufbytes += bp->b_bufsize - oldsz;
   1392 		wl->wl_bcount += bp->b_bcount - oldcnt;
   1393 		mutex_exit(&wl->wl_mtx);
   1394 	}
   1395 }
   1396 
   1397 #endif /* _KERNEL */
   1398 
   1399 /****************************************************************/
   1400 /* Some utility inlines */
   1401 
   1402 /*
   1403  * wapbl_space_used(avail, head, tail)
   1404  *
   1405  *	Number of bytes used in a circular queue of avail total bytes,
   1406  *	from tail to head.
   1407  */
   1408 static inline size_t
   1409 wapbl_space_used(size_t avail, off_t head, off_t tail)
   1410 {
   1411 
   1412 	if (tail == 0) {
   1413 		KASSERT(head == 0);
   1414 		return 0;
   1415 	}
   1416 	return ((head + (avail - 1) - tail) % avail) + 1;
   1417 }
   1418 
   1419 #ifdef _KERNEL
   1420 /*
   1421  * wapbl_advance(size, off, oldoff, delta)
   1422  *
   1423  *	Given a byte offset oldoff into a circular queue of size bytes
   1424  *	starting at off, return a new byte offset oldoff + delta into
   1425  *	the circular queue.
   1426  */
   1427 static inline off_t
   1428 wapbl_advance(size_t size, size_t off, off_t oldoff, size_t delta)
   1429 {
   1430 	off_t newoff;
   1431 
   1432 	/* Define acceptable ranges for inputs. */
   1433 	KASSERT(delta <= (size_t)size);
   1434 	KASSERT(oldoff == 0 || (size_t)oldoff >= off);
   1435 	KASSERT(oldoff < (off_t)(size + off));
   1436 
   1437 	if (oldoff == 0 && delta != 0)
   1438 		newoff = off + delta;
   1439 	else if (oldoff + delta < size + off)
   1440 		newoff = oldoff + delta;
   1441 	else
   1442 		newoff = (oldoff + delta) - size;
   1443 
   1444 	/* Note some interesting axioms */
   1445 	KASSERT(delta != 0 || newoff == oldoff);
   1446 	KASSERT(delta == 0 || newoff != 0);
   1447 	KASSERT(delta != size || newoff == oldoff);
   1448 
   1449 	/* Define acceptable ranges for output. */
   1450 	KASSERT(newoff == 0 || (size_t)newoff >= off);
   1451 	KASSERT((size_t)newoff < size + off);
   1452 	return newoff;
   1453 }
   1454 
   1455 /*
   1456  * wapbl_space_free(avail, head, tail)
   1457  *
   1458  *	Number of bytes free in a circular queue of avail total bytes,
   1459  *	in which everything from tail to head is used.
   1460  */
   1461 static inline size_t
   1462 wapbl_space_free(size_t avail, off_t head, off_t tail)
   1463 {
   1464 
   1465 	return avail - wapbl_space_used(avail, head, tail);
   1466 }
   1467 
   1468 /*
   1469  * wapbl_advance_head(size, off, delta, headp, tailp)
   1470  *
   1471  *	In a circular queue of size bytes starting at off, given the
   1472  *	old head and tail offsets *headp and *tailp, store the new head
   1473  *	and tail offsets in *headp and *tailp resulting from adding
   1474  *	delta bytes of data to the head.
   1475  */
   1476 static inline void
   1477 wapbl_advance_head(size_t size, size_t off, size_t delta, off_t *headp,
   1478     off_t *tailp)
   1479 {
   1480 	off_t head = *headp;
   1481 	off_t tail = *tailp;
   1482 
   1483 	KASSERT(delta <= wapbl_space_free(size, head, tail));
   1484 	head = wapbl_advance(size, off, head, delta);
   1485 	if (tail == 0 && head != 0)
   1486 		tail = off;
   1487 	*headp = head;
   1488 	*tailp = tail;
   1489 }
   1490 
   1491 /*
   1492  * wapbl_advance_tail(size, off, delta, headp, tailp)
   1493  *
   1494  *	In a circular queue of size bytes starting at off, given the
   1495  *	old head and tail offsets *headp and *tailp, store the new head
   1496  *	and tail offsets in *headp and *tailp resulting from removing
   1497  *	delta bytes of data from the tail.
   1498  */
   1499 static inline void
   1500 wapbl_advance_tail(size_t size, size_t off, size_t delta, off_t *headp,
   1501     off_t *tailp)
   1502 {
   1503 	off_t head = *headp;
   1504 	off_t tail = *tailp;
   1505 
   1506 	KASSERT(delta <= wapbl_space_used(size, head, tail));
   1507 	tail = wapbl_advance(size, off, tail, delta);
   1508 	if (head == tail) {
   1509 		head = tail = 0;
   1510 	}
   1511 	*headp = head;
   1512 	*tailp = tail;
   1513 }
   1514 
   1515 
   1516 /****************************************************************/
   1517 
   1518 /*
   1519  * wapbl_truncate(wl, minfree)
   1520  *
   1521  *	Wait until at least minfree bytes are available in the log.
   1522  *
   1523  *	If it was necessary to wait for writes to complete,
   1524  *	advance the circular queue tail to reflect the new write
   1525  *	completions and issue a write commit to the log.
   1526  *
   1527  *	=> Caller must hold wl->wl_rwlock writer lock.
   1528  */
   1529 static int
   1530 wapbl_truncate(struct wapbl *wl, size_t minfree)
   1531 {
   1532 	size_t delta;
   1533 	size_t avail;
   1534 	off_t head;
   1535 	off_t tail;
   1536 	int error = 0;
   1537 
   1538 	KASSERT(minfree <= (wl->wl_circ_size - wl->wl_reserved_bytes));
   1539 	KASSERT(rw_write_held(&wl->wl_rwlock));
   1540 
   1541 	mutex_enter(&wl->wl_mtx);
   1542 
   1543 	/*
   1544 	 * First check to see if we have to do a commit
   1545 	 * at all.
   1546 	 */
   1547 	avail = wapbl_space_free(wl->wl_circ_size, wl->wl_head, wl->wl_tail);
   1548 	if (minfree < avail) {
   1549 		mutex_exit(&wl->wl_mtx);
   1550 		return 0;
   1551 	}
   1552 	minfree -= avail;
   1553 	while (wl->wl_error_count == 0 &&
   1554 	    wl->wl_reclaimable_bytes < minfree) {
   1555 		WAPBL_PRINTF(WAPBL_PRINT_TRUNCATE,
   1556 		    ("wapbl_truncate: sleeping on %p"
   1557 			" wl=%p bytes=%zd minfree=%zd\n",
   1558 			&wl->wl_reclaimable_bytes,
   1559 			wl, wl->wl_reclaimable_bytes, minfree));
   1560 		cv_wait(&wl->wl_reclaimable_cv, &wl->wl_mtx);
   1561 	}
   1562 	if (wl->wl_reclaimable_bytes < minfree) {
   1563 		KASSERT(wl->wl_error_count);
   1564 		/* XXX maybe get actual error from buffer instead someday? */
   1565 		error = EIO;
   1566 	}
   1567 	head = wl->wl_head;
   1568 	tail = wl->wl_tail;
   1569 	delta = wl->wl_reclaimable_bytes;
   1570 
   1571 	/* If all of the entries are flushed, then be sure to keep
   1572 	 * the reserved bytes reserved.  Watch out for discarded transactions,
   1573 	 * which could leave more bytes reserved than are reclaimable.
   1574 	 */
   1575 	if (SIMPLEQ_EMPTY(&wl->wl_entries) && delta >= wl->wl_reserved_bytes) {
   1576 		delta -= wl->wl_reserved_bytes;
   1577 	}
   1578 	wapbl_advance_tail(wl->wl_circ_size, wl->wl_circ_off, delta, &head,
   1579 	    &tail);
   1580 	KDASSERT(wl->wl_reserved_bytes <=
   1581 	    wapbl_space_used(wl->wl_circ_size, head, tail));
   1582 	mutex_exit(&wl->wl_mtx);
   1583 
   1584 	if (error)
   1585 		return error;
   1586 
   1587 	/*
   1588 	 * This is where head, tail and delta are unprotected
   1589 	 * from races against itself or flush.  This is ok since
   1590 	 * we only call this routine from inside flush itself.
   1591 	 *
   1592 	 * XXX: how can it race against itself when accessed only
   1593 	 * from behind the write-locked rwlock?
   1594 	 */
   1595 	error = wapbl_write_commit(wl, head, tail);
   1596 	if (error)
   1597 		return error;
   1598 
   1599 	wl->wl_head = head;
   1600 	wl->wl_tail = tail;
   1601 
   1602 	mutex_enter(&wl->wl_mtx);
   1603 	KASSERT(wl->wl_reclaimable_bytes >= delta);
   1604 	wl->wl_reclaimable_bytes -= delta;
   1605 	mutex_exit(&wl->wl_mtx);
   1606 	WAPBL_PRINTF(WAPBL_PRINT_TRUNCATE,
   1607 	    ("wapbl_truncate thread %d.%d truncating %zu bytes\n",
   1608 		curproc->p_pid, curlwp->l_lid, delta));
   1609 
   1610 	return 0;
   1611 }
   1612 
   1613 /****************************************************************/
   1614 
   1615 void
   1616 wapbl_biodone(struct buf *bp)
   1617 {
   1618 	struct wapbl_entry *we = bp->b_private;
   1619 	struct wapbl *wl;
   1620 #ifdef WAPBL_DEBUG_BUFBYTES
   1621 	const int bufsize = bp->b_bufsize;
   1622 #endif
   1623 
   1624 	mutex_enter(&bufcache_lock);
   1625 	wl = we->we_wapbl;
   1626 	mutex_exit(&bufcache_lock);
   1627 
   1628 	/*
   1629 	 * Handle possible flushing of buffers after log has been
   1630 	 * decomissioned.
   1631 	 */
   1632 	if (!wl) {
   1633 		KASSERT(we->we_bufcount > 0);
   1634 		we->we_bufcount--;
   1635 #ifdef WAPBL_DEBUG_BUFBYTES
   1636 		KASSERT(we->we_unsynced_bufbytes >= bufsize);
   1637 		we->we_unsynced_bufbytes -= bufsize;
   1638 #endif
   1639 
   1640 		if (we->we_bufcount == 0) {
   1641 #ifdef WAPBL_DEBUG_BUFBYTES
   1642 			KASSERT(we->we_unsynced_bufbytes == 0);
   1643 #endif
   1644 			pool_put(&wapbl_entry_pool, we);
   1645 		}
   1646 
   1647 		brelse(bp, 0);
   1648 		return;
   1649 	}
   1650 
   1651 #ifdef ohbother
   1652 	KDASSERT(bp->b_oflags & BO_DONE);
   1653 	KDASSERT(!(bp->b_oflags & BO_DELWRI));
   1654 	KDASSERT(bp->b_flags & B_ASYNC);
   1655 	KDASSERT(bp->b_cflags & BC_BUSY);
   1656 	KDASSERT(!(bp->b_flags & B_LOCKED));
   1657 	KDASSERT(!(bp->b_flags & B_READ));
   1658 	KDASSERT(!(bp->b_cflags & BC_INVAL));
   1659 	KDASSERT(!(bp->b_cflags & BC_NOCACHE));
   1660 #endif
   1661 
   1662 	if (bp->b_error) {
   1663 		/*
   1664 		 * If an error occurs, it would be nice to leave the buffer
   1665 		 * as a delayed write on the LRU queue so that we can retry
   1666 		 * it later. But buffercache(9) can't handle dirty buffer
   1667 		 * reuse, so just mark the log permanently errored out.
   1668 		 */
   1669 		mutex_enter(&wl->wl_mtx);
   1670 		if (wl->wl_error_count == 0) {
   1671 			wl->wl_error_count++;
   1672 			cv_broadcast(&wl->wl_reclaimable_cv);
   1673 		}
   1674 		mutex_exit(&wl->wl_mtx);
   1675 	}
   1676 
   1677 	/*
   1678 	 * Make sure that the buf doesn't retain the media flags, so that
   1679 	 * e.g. wapbl_allow_fuadpo has immediate effect on any following I/O.
   1680 	 * The flags will be set again if needed by another I/O.
   1681 	 */
   1682 	bp->b_flags &= ~B_MEDIA_FLAGS;
   1683 
   1684 	/*
   1685 	 * Release the buffer here. wapbl_flush() may wait for the
   1686 	 * log to become empty and we better unbusy the buffer before
   1687 	 * wapbl_flush() returns.
   1688 	 */
   1689 	brelse(bp, 0);
   1690 
   1691 	mutex_enter(&wl->wl_mtx);
   1692 
   1693 	KASSERT(we->we_bufcount > 0);
   1694 	we->we_bufcount--;
   1695 #ifdef WAPBL_DEBUG_BUFBYTES
   1696 	KASSERT(we->we_unsynced_bufbytes >= bufsize);
   1697 	we->we_unsynced_bufbytes -= bufsize;
   1698 	KASSERT(wl->wl_unsynced_bufbytes >= bufsize);
   1699 	wl->wl_unsynced_bufbytes -= bufsize;
   1700 #endif
   1701 	wl->wl_ev_metawrite.ev_count++;
   1702 
   1703 	/*
   1704 	 * If the current transaction can be reclaimed, start
   1705 	 * at the beginning and reclaim any consecutive reclaimable
   1706 	 * transactions.  If we successfully reclaim anything,
   1707 	 * then wakeup anyone waiting for the reclaim.
   1708 	 */
   1709 	if (we->we_bufcount == 0) {
   1710 		size_t delta = 0;
   1711 		int errcnt = 0;
   1712 #ifdef WAPBL_DEBUG_BUFBYTES
   1713 		KDASSERT(we->we_unsynced_bufbytes == 0);
   1714 #endif
   1715 		/*
   1716 		 * clear any posted error, since the buffer it came from
   1717 		 * has successfully flushed by now
   1718 		 */
   1719 		while ((we = SIMPLEQ_FIRST(&wl->wl_entries)) &&
   1720 		    we->we_bufcount == 0) {
   1721 			delta += we->we_reclaimable_bytes;
   1722 			if (we->we_error)
   1723 				errcnt++;
   1724 			SIMPLEQ_REMOVE_HEAD(&wl->wl_entries, we_entries);
   1725 			pool_put(&wapbl_entry_pool, we);
   1726 		}
   1727 
   1728 		if (delta) {
   1729 			wl->wl_reclaimable_bytes += delta;
   1730 			KASSERT(wl->wl_error_count >= errcnt);
   1731 			wl->wl_error_count -= errcnt;
   1732 			cv_broadcast(&wl->wl_reclaimable_cv);
   1733 		}
   1734 	}
   1735 
   1736 	mutex_exit(&wl->wl_mtx);
   1737 }
   1738 
   1739 /*
   1740  * wapbl_flush(wl, wait)
   1741  *
   1742  *	Flush pending block writes, deallocations, and inodes from
   1743  *	the current transaction in memory to the log on disk:
   1744  *
   1745  *	1. Call the file system's wl_flush callback to flush any
   1746  *	   per-file-system pending updates.
   1747  *	2. Wait for enough space in the log for the current transaction.
   1748  *	3. Synchronously write the new log records, advancing the
   1749  *	   circular queue head.
   1750  *	4. Issue the pending block writes asynchronously, now that they
   1751  *	   are recorded in the log and can be replayed after crash.
   1752  *	5. If wait is true, wait for all writes to complete and for the
   1753  *	   log to become empty.
   1754  *
   1755  *	On failure, call the file system's wl_flush_abort callback.
   1756  */
   1757 int
   1758 wapbl_flush(struct wapbl *wl, int waitfor)
   1759 {
   1760 	struct buf *bp;
   1761 	struct wapbl_entry *we;
   1762 	off_t off;
   1763 	off_t head;
   1764 	off_t tail;
   1765 	size_t delta = 0;
   1766 	size_t flushsize;
   1767 	size_t reserved;
   1768 	int error = 0;
   1769 
   1770 	/*
   1771 	 * Do a quick check to see if a full flush can be skipped
   1772 	 * This assumes that the flush callback does not need to be called
   1773 	 * unless there are other outstanding bufs.
   1774 	 */
   1775 	if (!waitfor) {
   1776 		size_t nbufs;
   1777 		mutex_enter(&wl->wl_mtx);	/* XXX need mutex here to
   1778 						   protect the KASSERTS */
   1779 		nbufs = wl->wl_bufcount;
   1780 		KASSERT((wl->wl_bufcount == 0) == (wl->wl_bufbytes == 0));
   1781 		KASSERT((wl->wl_bufcount == 0) == (wl->wl_bcount == 0));
   1782 		mutex_exit(&wl->wl_mtx);
   1783 		if (nbufs == 0)
   1784 			return 0;
   1785 	}
   1786 
   1787 	/*
   1788 	 * XXX we may consider using LK_UPGRADE here
   1789 	 * if we want to call flush from inside a transaction
   1790 	 */
   1791 	rw_enter(&wl->wl_rwlock, RW_WRITER);
   1792 	wl->wl_flush(wl->wl_mount, TAILQ_FIRST(&wl->wl_dealloclist));
   1793 
   1794 	/*
   1795 	 * Now that we are exclusively locked and the file system has
   1796 	 * issued any deferred block writes for this transaction, check
   1797 	 * whether there are any blocks to write to the log.  If not,
   1798 	 * skip waiting for space or writing any log entries.
   1799 	 *
   1800 	 * XXX Shouldn't this also check wl_dealloccnt and
   1801 	 * wl_inohashcnt?  Perhaps wl_dealloccnt doesn't matter if the
   1802 	 * file system didn't produce any blocks as a consequence of
   1803 	 * it, but the same does not seem to be so of wl_inohashcnt.
   1804 	 */
   1805 	if (wl->wl_bufcount == 0) {
   1806 		goto wait_out;
   1807 	}
   1808 
   1809 #if 0
   1810 	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
   1811 	    ("wapbl_flush thread %d.%d flushing entries with "
   1812 		"bufcount=%zu bufbytes=%zu\n",
   1813 		curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
   1814 		wl->wl_bufbytes));
   1815 #endif
   1816 
   1817 	/* Calculate amount of space needed to flush */
   1818 	flushsize = wapbl_transaction_len(wl);
   1819 	if (wapbl_verbose_commit) {
   1820 		struct timespec ts;
   1821 		getnanotime(&ts);
   1822 		printf("%s: %lld.%09ld this transaction = %zu bytes\n",
   1823 		    __func__, (long long)ts.tv_sec,
   1824 		    (long)ts.tv_nsec, flushsize);
   1825 	}
   1826 
   1827 	if (flushsize > (wl->wl_circ_size - wl->wl_reserved_bytes)) {
   1828 		/*
   1829 		 * XXX this could be handled more gracefully, perhaps place
   1830 		 * only a partial transaction in the log and allow the
   1831 		 * remaining to flush without the protection of the journal.
   1832 		 */
   1833 		panic("wapbl_flush: current transaction too big to flush");
   1834 	}
   1835 
   1836 	error = wapbl_truncate(wl, flushsize);
   1837 	if (error)
   1838 		goto out;
   1839 
   1840 	off = wl->wl_head;
   1841 	KASSERT(off == 0 || off >= wl->wl_circ_off);
   1842 	KASSERT(off == 0 || off < wl->wl_circ_off + wl->wl_circ_size);
   1843 	error = wapbl_write_blocks(wl, &off);
   1844 	if (error)
   1845 		goto out;
   1846 	error = wapbl_write_revocations(wl, &off);
   1847 	if (error)
   1848 		goto out;
   1849 	error = wapbl_write_inodes(wl, &off);
   1850 	if (error)
   1851 		goto out;
   1852 
   1853 	reserved = 0;
   1854 	if (wl->wl_inohashcnt)
   1855 		reserved = wapbl_transaction_inodes_len(wl);
   1856 
   1857 	head = wl->wl_head;
   1858 	tail = wl->wl_tail;
   1859 
   1860 	wapbl_advance_head(wl->wl_circ_size, wl->wl_circ_off, flushsize,
   1861 	    &head, &tail);
   1862 
   1863 	KASSERTMSG(head == off,
   1864 	    "lost head! head=%"PRIdMAX" tail=%" PRIdMAX
   1865 	    " off=%"PRIdMAX" flush=%zu",
   1866 	    (intmax_t)head, (intmax_t)tail, (intmax_t)off,
   1867 	    flushsize);
   1868 
   1869 	/* Opportunistically move the tail forward if we can */
   1870 	mutex_enter(&wl->wl_mtx);
   1871 	delta = wl->wl_reclaimable_bytes;
   1872 	mutex_exit(&wl->wl_mtx);
   1873 	wapbl_advance_tail(wl->wl_circ_size, wl->wl_circ_off, delta,
   1874 	    &head, &tail);
   1875 
   1876 	error = wapbl_write_commit(wl, head, tail);
   1877 	if (error)
   1878 		goto out;
   1879 
   1880 	we = pool_get(&wapbl_entry_pool, PR_WAITOK);
   1881 
   1882 #ifdef WAPBL_DEBUG_BUFBYTES
   1883 	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
   1884 	    ("wapbl_flush: thread %d.%d head+=%zu tail+=%zu used=%zu"
   1885 		" unsynced=%zu"
   1886 		"\n\tbufcount=%zu bufbytes=%zu bcount=%zu deallocs=%d "
   1887 		"inodes=%d\n",
   1888 		curproc->p_pid, curlwp->l_lid, flushsize, delta,
   1889 		wapbl_space_used(wl->wl_circ_size, head, tail),
   1890 		wl->wl_unsynced_bufbytes, wl->wl_bufcount,
   1891 		wl->wl_bufbytes, wl->wl_bcount, wl->wl_dealloccnt,
   1892 		wl->wl_inohashcnt));
   1893 #else
   1894 	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
   1895 	    ("wapbl_flush: thread %d.%d head+=%zu tail+=%zu used=%zu"
   1896 		"\n\tbufcount=%zu bufbytes=%zu bcount=%zu deallocs=%d "
   1897 		"inodes=%d\n",
   1898 		curproc->p_pid, curlwp->l_lid, flushsize, delta,
   1899 		wapbl_space_used(wl->wl_circ_size, head, tail),
   1900 		wl->wl_bufcount, wl->wl_bufbytes, wl->wl_bcount,
   1901 		wl->wl_dealloccnt, wl->wl_inohashcnt));
   1902 #endif
   1903 
   1904 
   1905 	mutex_enter(&bufcache_lock);
   1906 	mutex_enter(&wl->wl_mtx);
   1907 
   1908 	wl->wl_reserved_bytes = reserved;
   1909 	wl->wl_head = head;
   1910 	wl->wl_tail = tail;
   1911 	KASSERT(wl->wl_reclaimable_bytes >= delta);
   1912 	wl->wl_reclaimable_bytes -= delta;
   1913 	KDASSERT(wl->wl_dealloccnt == 0);
   1914 #ifdef WAPBL_DEBUG_BUFBYTES
   1915 	wl->wl_unsynced_bufbytes += wl->wl_bufbytes;
   1916 #endif
   1917 
   1918 	we->we_wapbl = wl;
   1919 	we->we_bufcount = wl->wl_bufcount;
   1920 #ifdef WAPBL_DEBUG_BUFBYTES
   1921 	we->we_unsynced_bufbytes = wl->wl_bufbytes;
   1922 #endif
   1923 	we->we_reclaimable_bytes = flushsize;
   1924 	we->we_error = 0;
   1925 	SIMPLEQ_INSERT_TAIL(&wl->wl_entries, we, we_entries);
   1926 
   1927 	/*
   1928 	 * This flushes bufs in order than they were queued, so the LRU
   1929 	 * order is preserved.
   1930 	 */
   1931 	while ((bp = TAILQ_FIRST(&wl->wl_bufs)) != NULL) {
   1932 		if (bbusy(bp, 0, 0, &wl->wl_mtx)) {
   1933 			continue;
   1934 		}
   1935 		bp->b_iodone = wapbl_biodone;
   1936 		bp->b_private = we;
   1937 
   1938 		bremfree(bp);
   1939 		wapbl_remove_buf_locked(wl, bp);
   1940 		mutex_exit(&wl->wl_mtx);
   1941 		mutex_exit(&bufcache_lock);
   1942 		bawrite(bp);
   1943 		mutex_enter(&bufcache_lock);
   1944 		mutex_enter(&wl->wl_mtx);
   1945 	}
   1946 	mutex_exit(&wl->wl_mtx);
   1947 	mutex_exit(&bufcache_lock);
   1948 
   1949 #if 0
   1950 	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
   1951 	    ("wapbl_flush thread %d.%d done flushing entries...\n",
   1952 		curproc->p_pid, curlwp->l_lid));
   1953 #endif
   1954 
   1955 wait_out:
   1956 
   1957 	/*
   1958 	 * If the waitfor flag is set, don't return until everything is
   1959 	 * fully flushed and the on disk log is empty.
   1960 	 */
   1961 	if (waitfor) {
   1962 		error = wapbl_truncate(wl, wl->wl_circ_size -
   1963 		    wl->wl_reserved_bytes);
   1964 	}
   1965 
   1966 out:
   1967 	if (error) {
   1968 		wl->wl_flush_abort(wl->wl_mount,
   1969 		    TAILQ_FIRST(&wl->wl_dealloclist));
   1970 	}
   1971 
   1972 #ifdef WAPBL_DEBUG_PRINT
   1973 	if (error) {
   1974 		pid_t pid = -1;
   1975 		lwpid_t lid = -1;
   1976 		if (curproc)
   1977 			pid = curproc->p_pid;
   1978 		if (curlwp)
   1979 			lid = curlwp->l_lid;
   1980 		mutex_enter(&wl->wl_mtx);
   1981 #ifdef WAPBL_DEBUG_BUFBYTES
   1982 		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
   1983 		    ("wapbl_flush: thread %d.%d aborted flush: "
   1984 			"error = %d\n"
   1985 			"\tbufcount=%zu bufbytes=%zu bcount=%zu "
   1986 			"deallocs=%d inodes=%d\n"
   1987 			"\terrcnt = %d, reclaimable=%zu reserved=%zu "
   1988 			"unsynced=%zu\n",
   1989 			pid, lid, error, wl->wl_bufcount,
   1990 			wl->wl_bufbytes, wl->wl_bcount,
   1991 			wl->wl_dealloccnt, wl->wl_inohashcnt,
   1992 			wl->wl_error_count, wl->wl_reclaimable_bytes,
   1993 			wl->wl_reserved_bytes, wl->wl_unsynced_bufbytes));
   1994 		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
   1995 			WAPBL_PRINTF(WAPBL_PRINT_ERROR,
   1996 			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
   1997 				"error = %d, unsynced = %zu\n",
   1998 				we->we_bufcount, we->we_reclaimable_bytes,
   1999 				we->we_error, we->we_unsynced_bufbytes));
   2000 		}
   2001 #else
   2002 		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
   2003 		    ("wapbl_flush: thread %d.%d aborted flush: "
   2004 			"error = %d\n"
   2005 			"\tbufcount=%zu bufbytes=%zu bcount=%zu "
   2006 			"deallocs=%d inodes=%d\n"
   2007 			"\terrcnt = %d, reclaimable=%zu reserved=%zu\n",
   2008 			pid, lid, error, wl->wl_bufcount,
   2009 			wl->wl_bufbytes, wl->wl_bcount,
   2010 			wl->wl_dealloccnt, wl->wl_inohashcnt,
   2011 			wl->wl_error_count, wl->wl_reclaimable_bytes,
   2012 			wl->wl_reserved_bytes));
   2013 		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
   2014 			WAPBL_PRINTF(WAPBL_PRINT_ERROR,
   2015 			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
   2016 				"error = %d\n", we->we_bufcount,
   2017 				we->we_reclaimable_bytes, we->we_error));
   2018 		}
   2019 #endif
   2020 		mutex_exit(&wl->wl_mtx);
   2021 	}
   2022 #endif
   2023 
   2024 	rw_exit(&wl->wl_rwlock);
   2025 	return error;
   2026 }
   2027 
   2028 /****************************************************************/
   2029 
   2030 void
   2031 wapbl_jlock_assert(struct wapbl *wl)
   2032 {
   2033 
   2034 	KASSERT(rw_lock_held(&wl->wl_rwlock));
   2035 }
   2036 
   2037 void
   2038 wapbl_junlock_assert(struct wapbl *wl)
   2039 {
   2040 
   2041 	KASSERT(!rw_write_held(&wl->wl_rwlock));
   2042 }
   2043 
   2044 /****************************************************************/
   2045 
   2046 /* locks missing */
   2047 void
   2048 wapbl_print(struct wapbl *wl, int full, void (*pr)(const char *, ...))
   2049 {
   2050 	struct buf *bp;
   2051 	struct wapbl_entry *we;
   2052 	(*pr)("wapbl %p", wl);
   2053 	(*pr)("\nlogvp = %p, devvp = %p, logpbn = %"PRId64"\n",
   2054 	    wl->wl_logvp, wl->wl_devvp, wl->wl_logpbn);
   2055 	(*pr)("circ = %zu, header = %zu,"
   2056 	    " head = %"PRIdMAX" tail = %"PRIdMAX"\n",
   2057 	    wl->wl_circ_size, wl->wl_circ_off,
   2058 	    (intmax_t)wl->wl_head, (intmax_t)wl->wl_tail);
   2059 	(*pr)("fs_dev_bshift = %d, log_dev_bshift = %d\n",
   2060 	    wl->wl_log_dev_bshift, wl->wl_fs_dev_bshift);
   2061 #ifdef WAPBL_DEBUG_BUFBYTES
   2062 	(*pr)("bufcount = %zu, bufbytes = %zu bcount = %zu reclaimable = %zu "
   2063 	    "reserved = %zu errcnt = %d unsynced = %zu\n",
   2064 	    wl->wl_bufcount, wl->wl_bufbytes, wl->wl_bcount,
   2065 	    wl->wl_reclaimable_bytes, wl->wl_reserved_bytes,
   2066 	    wl->wl_error_count, wl->wl_unsynced_bufbytes);
   2067 #else
   2068 	(*pr)("bufcount = %zu, bufbytes = %zu bcount = %zu reclaimable = %zu "
   2069 	    "reserved = %zu errcnt = %d\n", wl->wl_bufcount, wl->wl_bufbytes,
   2070 	    wl->wl_bcount, wl->wl_reclaimable_bytes, wl->wl_reserved_bytes,
   2071 	    wl->wl_error_count);
   2072 #endif
   2073 	(*pr)("\tdealloccnt = %d, dealloclim = %d\n",
   2074 	    wl->wl_dealloccnt, wl->wl_dealloclim);
   2075 	(*pr)("\tinohashcnt = %d, inohashmask = 0x%08x\n",
   2076 	    wl->wl_inohashcnt, wl->wl_inohashmask);
   2077 	(*pr)("entries:\n");
   2078 	SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
   2079 #ifdef WAPBL_DEBUG_BUFBYTES
   2080 		(*pr)("\tbufcount = %zu, reclaimable = %zu, error = %d, "
   2081 		    "unsynced = %zu\n",
   2082 		    we->we_bufcount, we->we_reclaimable_bytes,
   2083 		    we->we_error, we->we_unsynced_bufbytes);
   2084 #else
   2085 		(*pr)("\tbufcount = %zu, reclaimable = %zu, error = %d\n",
   2086 		    we->we_bufcount, we->we_reclaimable_bytes, we->we_error);
   2087 #endif
   2088 	}
   2089 	if (full) {
   2090 		int cnt = 0;
   2091 		(*pr)("bufs =");
   2092 		TAILQ_FOREACH(bp, &wl->wl_bufs, b_wapbllist) {
   2093 			if (!TAILQ_NEXT(bp, b_wapbllist)) {
   2094 				(*pr)(" %p", bp);
   2095 			} else if ((++cnt % 6) == 0) {
   2096 				(*pr)(" %p,\n\t", bp);
   2097 			} else {
   2098 				(*pr)(" %p,", bp);
   2099 			}
   2100 		}
   2101 		(*pr)("\n");
   2102 
   2103 		(*pr)("dealloced blks = ");
   2104 		{
   2105 			struct wapbl_dealloc *wd;
   2106 			cnt = 0;
   2107 			TAILQ_FOREACH(wd, &wl->wl_dealloclist, wd_entries) {
   2108 				(*pr)(" %"PRId64":%d,",
   2109 				    wd->wd_blkno,
   2110 				    wd->wd_len);
   2111 				if ((++cnt % 4) == 0) {
   2112 					(*pr)("\n\t");
   2113 				}
   2114 			}
   2115 		}
   2116 		(*pr)("\n");
   2117 
   2118 		(*pr)("registered inodes = ");
   2119 		{
   2120 			int i;
   2121 			cnt = 0;
   2122 			for (i = 0; i <= wl->wl_inohashmask; i++) {
   2123 				struct wapbl_ino_head *wih;
   2124 				struct wapbl_ino *wi;
   2125 
   2126 				wih = &wl->wl_inohash[i];
   2127 				LIST_FOREACH(wi, wih, wi_hash) {
   2128 					if (wi->wi_ino == 0)
   2129 						continue;
   2130 					(*pr)(" %"PRIu64"/0%06"PRIo32",",
   2131 					    wi->wi_ino, wi->wi_mode);
   2132 					if ((++cnt % 4) == 0) {
   2133 						(*pr)("\n\t");
   2134 					}
   2135 				}
   2136 			}
   2137 			(*pr)("\n");
   2138 		}
   2139 
   2140 		(*pr)("iobufs free =");
   2141 		TAILQ_FOREACH(bp, &wl->wl_iobufs, b_wapbllist) {
   2142 			if (!TAILQ_NEXT(bp, b_wapbllist)) {
   2143 				(*pr)(" %p", bp);
   2144 			} else if ((++cnt % 6) == 0) {
   2145 				(*pr)(" %p,\n\t", bp);
   2146 			} else {
   2147 				(*pr)(" %p,", bp);
   2148 			}
   2149 		}
   2150 		(*pr)("\n");
   2151 
   2152 		(*pr)("iobufs busy =");
   2153 		TAILQ_FOREACH(bp, &wl->wl_iobufs_busy, b_wapbllist) {
   2154 			if (!TAILQ_NEXT(bp, b_wapbllist)) {
   2155 				(*pr)(" %p", bp);
   2156 			} else if ((++cnt % 6) == 0) {
   2157 				(*pr)(" %p,\n\t", bp);
   2158 			} else {
   2159 				(*pr)(" %p,", bp);
   2160 			}
   2161 		}
   2162 		(*pr)("\n");
   2163 	}
   2164 }
   2165 
   2166 #if defined(WAPBL_DEBUG) || defined(DDB)
   2167 void
   2168 wapbl_dump(struct wapbl *wl)
   2169 {
   2170 #if defined(WAPBL_DEBUG)
   2171 	if (!wl)
   2172 		wl = wapbl_debug_wl;
   2173 #endif
   2174 	if (!wl)
   2175 		return;
   2176 	wapbl_print(wl, 1, printf);
   2177 }
   2178 #endif
   2179 
   2180 /****************************************************************/
   2181 
   2182 int
   2183 wapbl_register_deallocation(struct wapbl *wl, daddr_t blk, int len, bool force,
   2184     void **cookiep)
   2185 {
   2186 	struct wapbl_dealloc *wd;
   2187 	int error = 0;
   2188 
   2189 	wapbl_jlock_assert(wl);
   2190 
   2191 	mutex_enter(&wl->wl_mtx);
   2192 
   2193 	if (__predict_false(wl->wl_dealloccnt >= wl->wl_dealloclim)) {
   2194 		if (!force) {
   2195 			error = EAGAIN;
   2196 			goto out;
   2197 		}
   2198 
   2199 		/*
   2200 		 * Forced registration can only be used when:
   2201 		 * 1) the caller can't cope with failure
   2202 		 * 2) the path can be triggered only bounded, small
   2203 		 *    times per transaction
   2204 		 * If this is not fullfilled, and the path would be triggered
   2205 		 * many times, this could overflow maximum transaction size
   2206 		 * and panic later.
   2207 		 */
   2208 		printf("%s: forced dealloc registration over limit:"
   2209 		    " %d >= %d\n",
   2210 		    wl->wl_mount->mnt_stat.f_mntonname,
   2211 		    wl->wl_dealloccnt, wl->wl_dealloclim);
   2212 	}
   2213 
   2214 	wl->wl_dealloccnt++;
   2215 	mutex_exit(&wl->wl_mtx);
   2216 
   2217 	wd = pool_get(&wapbl_dealloc_pool, PR_WAITOK);
   2218 	wd->wd_blkno = blk;
   2219 	wd->wd_len = len;
   2220 
   2221 	mutex_enter(&wl->wl_mtx);
   2222 	TAILQ_INSERT_TAIL(&wl->wl_dealloclist, wd, wd_entries);
   2223 
   2224 	if (cookiep)
   2225 		*cookiep = wd;
   2226 
   2227 out:
   2228 	mutex_exit(&wl->wl_mtx);
   2229 
   2230 	WAPBL_PRINTF(WAPBL_PRINT_ALLOC,
   2231 	    ("wapbl_register_deallocation: blk=%"PRId64" len=%d error=%d\n",
   2232 		blk, len, error));
   2233 
   2234 	return error;
   2235 }
   2236 
   2237 static void
   2238 wapbl_deallocation_free(struct wapbl *wl, struct wapbl_dealloc *wd,
   2239 	bool locked)
   2240 {
   2241 
   2242 	KASSERT(!locked
   2243 	    || rw_lock_held(&wl->wl_rwlock) || mutex_owned(&wl->wl_mtx));
   2244 
   2245 	if (!locked)
   2246 		mutex_enter(&wl->wl_mtx);
   2247 
   2248 	TAILQ_REMOVE(&wl->wl_dealloclist, wd, wd_entries);
   2249 	wl->wl_dealloccnt--;
   2250 
   2251 	if (!locked)
   2252 		mutex_exit(&wl->wl_mtx);
   2253 
   2254 	pool_put(&wapbl_dealloc_pool, wd);
   2255 }
   2256 
   2257 void
   2258 wapbl_unregister_deallocation(struct wapbl *wl, void *cookie)
   2259 {
   2260 
   2261 	KASSERT(cookie != NULL);
   2262 	wapbl_deallocation_free(wl, cookie, false);
   2263 }
   2264 
   2265 /****************************************************************/
   2266 
   2267 static void
   2268 wapbl_inodetrk_init(struct wapbl *wl, u_int size)
   2269 {
   2270 
   2271 	wl->wl_inohash = hashinit(size, HASH_LIST, true, &wl->wl_inohashmask);
   2272 	if (atomic_inc_uint_nv(&wapbl_ino_pool_refcount) == 1) {
   2273 		pool_init(&wapbl_ino_pool, sizeof(struct wapbl_ino), 0, 0, 0,
   2274 		    "wapblinopl", &pool_allocator_nointr, IPL_NONE);
   2275 	}
   2276 }
   2277 
   2278 static void
   2279 wapbl_inodetrk_free(struct wapbl *wl)
   2280 {
   2281 
   2282 	/* XXX this KASSERT needs locking/mutex analysis */
   2283 	KASSERT(wl->wl_inohashcnt == 0);
   2284 	hashdone(wl->wl_inohash, HASH_LIST, wl->wl_inohashmask);
   2285 	membar_release();
   2286 	if (atomic_dec_uint_nv(&wapbl_ino_pool_refcount) == 0) {
   2287 		membar_acquire();
   2288 		pool_destroy(&wapbl_ino_pool);
   2289 	}
   2290 }
   2291 
   2292 static struct wapbl_ino *
   2293 wapbl_inodetrk_get(struct wapbl *wl, ino_t ino)
   2294 {
   2295 	struct wapbl_ino_head *wih;
   2296 	struct wapbl_ino *wi;
   2297 
   2298 	KASSERT(mutex_owned(&wl->wl_mtx));
   2299 
   2300 	wih = &wl->wl_inohash[ino & wl->wl_inohashmask];
   2301 	LIST_FOREACH(wi, wih, wi_hash) {
   2302 		if (ino == wi->wi_ino)
   2303 			return wi;
   2304 	}
   2305 	return 0;
   2306 }
   2307 
   2308 void
   2309 wapbl_register_inode(struct wapbl *wl, ino_t ino, mode_t mode)
   2310 {
   2311 	struct wapbl_ino_head *wih;
   2312 	struct wapbl_ino *wi;
   2313 
   2314 	wi = pool_get(&wapbl_ino_pool, PR_WAITOK);
   2315 
   2316 	mutex_enter(&wl->wl_mtx);
   2317 	if (wapbl_inodetrk_get(wl, ino) == NULL) {
   2318 		wi->wi_ino = ino;
   2319 		wi->wi_mode = mode;
   2320 		wih = &wl->wl_inohash[ino & wl->wl_inohashmask];
   2321 		LIST_INSERT_HEAD(wih, wi, wi_hash);
   2322 		wl->wl_inohashcnt++;
   2323 		WAPBL_PRINTF(WAPBL_PRINT_INODE,
   2324 		    ("wapbl_register_inode: ino=%"PRId64"\n", ino));
   2325 		mutex_exit(&wl->wl_mtx);
   2326 	} else {
   2327 		mutex_exit(&wl->wl_mtx);
   2328 		pool_put(&wapbl_ino_pool, wi);
   2329 	}
   2330 }
   2331 
   2332 void
   2333 wapbl_unregister_inode(struct wapbl *wl, ino_t ino, mode_t mode)
   2334 {
   2335 	struct wapbl_ino *wi;
   2336 
   2337 	mutex_enter(&wl->wl_mtx);
   2338 	wi = wapbl_inodetrk_get(wl, ino);
   2339 	if (wi) {
   2340 		WAPBL_PRINTF(WAPBL_PRINT_INODE,
   2341 		    ("wapbl_unregister_inode: ino=%"PRId64"\n", ino));
   2342 		KASSERT(wl->wl_inohashcnt > 0);
   2343 		wl->wl_inohashcnt--;
   2344 		LIST_REMOVE(wi, wi_hash);
   2345 		mutex_exit(&wl->wl_mtx);
   2346 
   2347 		pool_put(&wapbl_ino_pool, wi);
   2348 	} else {
   2349 		mutex_exit(&wl->wl_mtx);
   2350 	}
   2351 }
   2352 
   2353 /****************************************************************/
   2354 
   2355 /*
   2356  * wapbl_transaction_inodes_len(wl)
   2357  *
   2358  *	Calculate the number of bytes required for inode registration
   2359  *	log records in wl.
   2360  */
   2361 static inline size_t
   2362 wapbl_transaction_inodes_len(struct wapbl *wl)
   2363 {
   2364 	int blocklen = 1<<wl->wl_log_dev_bshift;
   2365 	int iph;
   2366 
   2367 	/* Calculate number of inodes described in a inodelist header */
   2368 	iph = (blocklen - offsetof(struct wapbl_wc_inodelist, wc_inodes)) /
   2369 	    sizeof(((struct wapbl_wc_inodelist *)0)->wc_inodes[0]);
   2370 
   2371 	KASSERT(iph > 0);
   2372 
   2373 	return MAX(1, howmany(wl->wl_inohashcnt, iph)) * blocklen;
   2374 }
   2375 
   2376 
   2377 /*
   2378  * wapbl_transaction_len(wl)
   2379  *
   2380  *	Calculate number of bytes required for all log records in wl.
   2381  */
   2382 static size_t
   2383 wapbl_transaction_len(struct wapbl *wl)
   2384 {
   2385 	int blocklen = 1<<wl->wl_log_dev_bshift;
   2386 	size_t len;
   2387 
   2388 	/* Calculate number of blocks described in a blocklist header */
   2389 	len = wl->wl_bcount;
   2390 	len += howmany(wl->wl_bufcount, wl->wl_brperjblock) * blocklen;
   2391 	len += howmany(wl->wl_dealloccnt, wl->wl_brperjblock) * blocklen;
   2392 	len += wapbl_transaction_inodes_len(wl);
   2393 
   2394 	return len;
   2395 }
   2396 
   2397 /*
   2398  * wapbl_cache_sync(wl, msg)
   2399  *
   2400  *	Issue DIOCCACHESYNC to wl->wl_devvp.
   2401  *
   2402  *	If sysctl(vfs.wapbl.verbose_commit) >= 2, print a message
   2403  *	including msg about the duration of the cache sync.
   2404  */
   2405 static int
   2406 wapbl_cache_sync(struct wapbl *wl, const char *msg)
   2407 {
   2408 	const bool verbose = wapbl_verbose_commit >= 2;
   2409 	struct bintime start_time;
   2410 	int force = 1;
   2411 	int error;
   2412 
   2413 	/* Skip full cache sync if disabled */
   2414 	if (!wapbl_flush_disk_cache) {
   2415 		return 0;
   2416 	}
   2417 	if (verbose) {
   2418 		bintime(&start_time);
   2419 	}
   2420 	error = VOP_IOCTL(wl->wl_devvp, DIOCCACHESYNC, &force,
   2421 	    FWRITE, FSCRED);
   2422 	if (error) {
   2423 		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
   2424 		    ("wapbl_cache_sync: DIOCCACHESYNC on dev 0x%jx "
   2425 			"returned %d\n", (uintmax_t)wl->wl_devvp->v_rdev,
   2426 			error));
   2427 	}
   2428 	if (verbose) {
   2429 		struct bintime d;
   2430 		struct timespec ts;
   2431 
   2432 		bintime(&d);
   2433 		bintime_sub(&d, &start_time);
   2434 		bintime2timespec(&d, &ts);
   2435 		printf("wapbl_cache_sync: %s: dev 0x%jx %ju.%09lu\n",
   2436 		    msg, (uintmax_t)wl->wl_devvp->v_rdev,
   2437 		    (uintmax_t)ts.tv_sec, ts.tv_nsec);
   2438 	}
   2439 
   2440 	wl->wl_ev_cacheflush.ev_count++;
   2441 
   2442 	return error;
   2443 }
   2444 
   2445 /*
   2446  * wapbl_write_commit(wl, head, tail)
   2447  *
   2448  *	Issue a disk cache sync to wait for all pending writes to the
   2449  *	log to complete, and then synchronously commit the current
   2450  *	circular queue head and tail to the log, in the next of two
   2451  *	locations for commit headers on disk.
   2452  *
   2453  *	Increment the generation number.  If the generation number
   2454  *	rolls over to zero, then a subsequent commit would appear to
   2455  *	have an older generation than this one -- in that case, issue a
   2456  *	duplicate commit to avoid this.
   2457  *
   2458  *	=> Caller must have exclusive access to wl, either by holding
   2459  *	wl->wl_rwlock for writer or by being wapbl_start before anyone
   2460  *	else has seen wl.
   2461  */
   2462 static int
   2463 wapbl_write_commit(struct wapbl *wl, off_t head, off_t tail)
   2464 {
   2465 	struct wapbl_wc_header *wc = wl->wl_wc_header;
   2466 	struct timespec ts;
   2467 	int error;
   2468 	daddr_t pbn;
   2469 
   2470 	error = wapbl_buffered_flush(wl, true);
   2471 	if (error)
   2472 		return error;
   2473 	/*
   2474 	 * Flush disk cache to ensure that blocks we've written are actually
   2475 	 * written to the stable storage before the commit header.
   2476 	 * This flushes to disk not only journal blocks, but also all
   2477 	 * metadata blocks, written asynchronously since previous commit.
   2478 	 *
   2479 	 * XXX Calc checksum here, instead we do this for now
   2480 	 */
   2481 	wapbl_cache_sync(wl, "1");
   2482 
   2483 	wc->wc_head = head;
   2484 	wc->wc_tail = tail;
   2485 	wc->wc_checksum = 0;
   2486 	wc->wc_version = 1;
   2487 	getnanotime(&ts);
   2488 	wc->wc_time = ts.tv_sec;
   2489 	wc->wc_timensec = ts.tv_nsec;
   2490 
   2491 	WAPBL_PRINTF(WAPBL_PRINT_WRITE,
   2492 	    ("wapbl_write_commit: head = %"PRIdMAX "tail = %"PRIdMAX"\n",
   2493 		(intmax_t)head, (intmax_t)tail));
   2494 
   2495 	/*
   2496 	 * write the commit header.
   2497 	 *
   2498 	 * XXX if generation will rollover, then first zero
   2499 	 * over second commit header before trying to write both headers.
   2500 	 */
   2501 
   2502 	pbn = wl->wl_logpbn + (wc->wc_generation % 2);
   2503 #ifdef _KERNEL
   2504 	pbn = btodb(pbn << wc->wc_log_dev_bshift);
   2505 #endif
   2506 	error = wapbl_buffered_write(wc, wc->wc_len, wl, pbn,
   2507 	    WAPBL_JFLAGS(wl));
   2508 	if (error)
   2509 		return error;
   2510 	error = wapbl_buffered_flush(wl, true);
   2511 	if (error)
   2512 		return error;
   2513 
   2514 	/*
   2515 	 * Flush disk cache to ensure that the commit header is actually
   2516 	 * written before meta data blocks. Commit block is written using
   2517 	 * FUA when enabled, in that case this flush is not needed.
   2518 	 */
   2519 	if (!WAPBL_USE_FUA(wl))
   2520 		wapbl_cache_sync(wl, "2");
   2521 
   2522 	/*
   2523 	 * If the generation number was zero, write it out a second time.
   2524 	 * This handles initialization and generation number rollover
   2525 	 */
   2526 	if (wc->wc_generation++ == 0) {
   2527 		error = wapbl_write_commit(wl, head, tail);
   2528 		/*
   2529 		 * This panic should be able to be removed if we do the
   2530 		 * zero'ing mentioned above, and we are certain to roll
   2531 		 * back generation number on failure.
   2532 		 */
   2533 		if (error) {
   2534 			panic("wapbl_write_commit: error writing duplicate "
   2535 			    "log header: %d", error);
   2536 		}
   2537 	}
   2538 
   2539 	wl->wl_ev_commit.ev_count++;
   2540 
   2541 	return 0;
   2542 }
   2543 
   2544 /*
   2545  * wapbl_write_blocks(wl, offp)
   2546  *
   2547  *	Write all pending physical blocks in the current transaction
   2548  *	from wapbl_add_buf to the log on disk, adding to the circular
   2549  *	queue head at byte offset *offp, and returning the new head's
   2550  *	byte offset in *offp.
   2551  */
   2552 static int
   2553 wapbl_write_blocks(struct wapbl *wl, off_t *offp)
   2554 {
   2555 	struct wapbl_wc_blocklist *wc =
   2556 	    (struct wapbl_wc_blocklist *)wl->wl_wc_scratch;
   2557 	int blocklen = 1<<wl->wl_log_dev_bshift;
   2558 	struct buf *bp;
   2559 	off_t off = *offp;
   2560 	int error;
   2561 	size_t padding;
   2562 
   2563 	KASSERT(rw_write_held(&wl->wl_rwlock));
   2564 
   2565 	bp = TAILQ_FIRST(&wl->wl_bufs);
   2566 
   2567 	while (bp) {
   2568 		int cnt;
   2569 		struct buf *obp = bp;
   2570 
   2571 		KASSERT(bp->b_flags & B_LOCKED);
   2572 
   2573 		wc->wc_type = WAPBL_WC_BLOCKS;
   2574 		wc->wc_len = blocklen;
   2575 		wc->wc_blkcount = 0;
   2576 		wc->wc_unused = 0;
   2577 		while (bp && wc->wc_blkcount < wl->wl_brperjblock) {
   2578 			/*
   2579 			 * Make sure all the physical block numbers are up to
   2580 			 * date.  If this is not always true on a given
   2581 			 * filesystem, then VOP_BMAP must be called.  We
   2582 			 * could call VOP_BMAP here, or else in the filesystem
   2583 			 * specific flush callback, although neither of those
   2584 			 * solutions allow us to take the vnode lock.  If a
   2585 			 * filesystem requires that we must take the vnode lock
   2586 			 * to call VOP_BMAP, then we can probably do it in
   2587 			 * bwrite when the vnode lock should already be held
   2588 			 * by the invoking code.
   2589 			 */
   2590 			KASSERT(bp->b_vp->v_type == VBLK ||
   2591 			    bp->b_blkno != bp->b_lblkno);
   2592 			KASSERT(bp->b_blkno > 0);
   2593 
   2594 			wc->wc_blocks[wc->wc_blkcount].wc_daddr = bp->b_blkno;
   2595 			wc->wc_blocks[wc->wc_blkcount].wc_dlen = bp->b_bcount;
   2596 			wc->wc_len += bp->b_bcount;
   2597 			wc->wc_blkcount++;
   2598 			bp = TAILQ_NEXT(bp, b_wapbllist);
   2599 		}
   2600 		if (wc->wc_len % blocklen != 0) {
   2601 			padding = blocklen - wc->wc_len % blocklen;
   2602 			wc->wc_len += padding;
   2603 		} else {
   2604 			padding = 0;
   2605 		}
   2606 
   2607 		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
   2608 		    ("wapbl_write_blocks:"
   2609 			" len = %u (padding %zu) off = %"PRIdMAX"\n",
   2610 			wc->wc_len, padding, (intmax_t)off));
   2611 
   2612 		error = wapbl_circ_write(wl, wc, blocklen, &off);
   2613 		if (error)
   2614 			return error;
   2615 		bp = obp;
   2616 		cnt = 0;
   2617 		while (bp && cnt++ < wl->wl_brperjblock) {
   2618 			error = wapbl_circ_write(wl, bp->b_data,
   2619 			    bp->b_bcount, &off);
   2620 			if (error)
   2621 				return error;
   2622 			bp = TAILQ_NEXT(bp, b_wapbllist);
   2623 		}
   2624 		if (padding) {
   2625 			void *zero;
   2626 
   2627 			zero = wapbl_alloc(padding);
   2628 			memset(zero, 0, padding);
   2629 			error = wapbl_circ_write(wl, zero, padding, &off);
   2630 			wapbl_free(zero, padding);
   2631 			if (error)
   2632 				return error;
   2633 		}
   2634 	}
   2635 	*offp = off;
   2636 	return 0;
   2637 }
   2638 
   2639 /*
   2640  * wapbl_write_revocations(wl, offp)
   2641  *
   2642  *	Write all pending deallocations in the current transaction from
   2643  *	wapbl_register_deallocation to the log on disk, adding to the
   2644  *	circular queue's head at byte offset *offp, and returning the
   2645  *	new head's byte offset in *offp.
   2646  */
   2647 static int
   2648 wapbl_write_revocations(struct wapbl *wl, off_t *offp)
   2649 {
   2650 	struct wapbl_wc_blocklist *wc =
   2651 	    (struct wapbl_wc_blocklist *)wl->wl_wc_scratch;
   2652 	struct wapbl_dealloc *wd, *lwd;
   2653 	int blocklen = 1<<wl->wl_log_dev_bshift;
   2654 	off_t off = *offp;
   2655 	int error;
   2656 
   2657 	KASSERT(rw_write_held(&wl->wl_rwlock));
   2658 
   2659 	if (wl->wl_dealloccnt == 0)
   2660 		return 0;
   2661 
   2662 	while ((wd = TAILQ_FIRST(&wl->wl_dealloclist)) != NULL) {
   2663 		wc->wc_type = WAPBL_WC_REVOCATIONS;
   2664 		wc->wc_len = blocklen;
   2665 		wc->wc_blkcount = 0;
   2666 		wc->wc_unused = 0;
   2667 		while (wd && wc->wc_blkcount < wl->wl_brperjblock) {
   2668 			wc->wc_blocks[wc->wc_blkcount].wc_daddr =
   2669 			    wd->wd_blkno;
   2670 			wc->wc_blocks[wc->wc_blkcount].wc_dlen =
   2671 			    wd->wd_len;
   2672 			wc->wc_blkcount++;
   2673 
   2674 			wd = TAILQ_NEXT(wd, wd_entries);
   2675 		}
   2676 		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
   2677 		    ("wapbl_write_revocations: len = %u off = %"PRIdMAX"\n",
   2678 			wc->wc_len, (intmax_t)off));
   2679 		error = wapbl_circ_write(wl, wc, blocklen, &off);
   2680 		if (error)
   2681 			return error;
   2682 
   2683 		/* free all successfully written deallocs */
   2684 		lwd = wd;
   2685 		while ((wd = TAILQ_FIRST(&wl->wl_dealloclist)) != NULL) {
   2686 			if (wd == lwd)
   2687 				break;
   2688 			wapbl_deallocation_free(wl, wd, true);
   2689 		}
   2690 	}
   2691 	*offp = off;
   2692 	return 0;
   2693 }
   2694 
   2695 /*
   2696  * wapbl_write_inodes(wl, offp)
   2697  *
   2698  *	Write all pending inode allocations in the current transaction
   2699  *	from wapbl_register_inode to the log on disk, adding to the
   2700  *	circular queue's head at byte offset *offp and returning the
   2701  *	new head's byte offset in *offp.
   2702  */
   2703 static int
   2704 wapbl_write_inodes(struct wapbl *wl, off_t *offp)
   2705 {
   2706 	struct wapbl_wc_inodelist *wc =
   2707 	    (struct wapbl_wc_inodelist *)wl->wl_wc_scratch;
   2708 	int i;
   2709 	int blocklen = 1 << wl->wl_log_dev_bshift;
   2710 	off_t off = *offp;
   2711 	int error;
   2712 
   2713 	struct wapbl_ino_head *wih;
   2714 	struct wapbl_ino *wi;
   2715 	int iph;
   2716 
   2717 	iph = (blocklen - offsetof(struct wapbl_wc_inodelist, wc_inodes)) /
   2718 	    sizeof(((struct wapbl_wc_inodelist *)0)->wc_inodes[0]);
   2719 
   2720 	i = 0;
   2721 	wih = &wl->wl_inohash[0];
   2722 	wi = 0;
   2723 	do {
   2724 		wc->wc_type = WAPBL_WC_INODES;
   2725 		wc->wc_len = blocklen;
   2726 		wc->wc_inocnt = 0;
   2727 		wc->wc_clear = (i == 0);
   2728 		while (i < wl->wl_inohashcnt && wc->wc_inocnt < iph) {
   2729 			while (!wi) {
   2730 				KASSERT((wih - &wl->wl_inohash[0])
   2731 				    <= wl->wl_inohashmask);
   2732 				wi = LIST_FIRST(wih++);
   2733 			}
   2734 			wc->wc_inodes[wc->wc_inocnt].wc_inumber = wi->wi_ino;
   2735 			wc->wc_inodes[wc->wc_inocnt].wc_imode = wi->wi_mode;
   2736 			wc->wc_inocnt++;
   2737 			i++;
   2738 			wi = LIST_NEXT(wi, wi_hash);
   2739 		}
   2740 		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
   2741 		    ("wapbl_write_inodes: len = %u off = %"PRIdMAX"\n",
   2742 			wc->wc_len, (intmax_t)off));
   2743 		error = wapbl_circ_write(wl, wc, blocklen, &off);
   2744 		if (error)
   2745 			return error;
   2746 	} while (i < wl->wl_inohashcnt);
   2747 
   2748 	*offp = off;
   2749 	return 0;
   2750 }
   2751 
   2752 #endif /* _KERNEL */
   2753 
   2754 /****************************************************************/
   2755 
   2756 struct wapbl_blk {
   2757 	LIST_ENTRY(wapbl_blk) wb_hash;
   2758 	daddr_t wb_blk;
   2759 	off_t wb_off; /* Offset of this block in the log */
   2760 };
   2761 #define	WAPBL_BLKPOOL_MIN 83
   2762 
   2763 static void
   2764 wapbl_blkhash_init(struct wapbl_replay *wr, u_int size)
   2765 {
   2766 
   2767 	if (size < WAPBL_BLKPOOL_MIN)
   2768 		size = WAPBL_BLKPOOL_MIN;
   2769 	KASSERT(wr->wr_blkhash == 0);
   2770 #ifdef _KERNEL
   2771 	wr->wr_blkhash = hashinit(size, HASH_LIST, true, &wr->wr_blkhashmask);
   2772 #else /* ! _KERNEL */
   2773 	/* Manually implement hashinit */
   2774 	{
   2775 		unsigned long i, hashsize;
   2776 
   2777 		for (hashsize = 1; hashsize < size; hashsize <<= 1)
   2778 			continue;
   2779 		wr->wr_blkhash = wapbl_alloc(hashsize *
   2780 		    sizeof(*wr->wr_blkhash));
   2781 		for (i = 0; i < hashsize; i++)
   2782 			LIST_INIT(&wr->wr_blkhash[i]);
   2783 		wr->wr_blkhashmask = hashsize - 1;
   2784 	}
   2785 #endif /* ! _KERNEL */
   2786 }
   2787 
   2788 static void
   2789 wapbl_blkhash_free(struct wapbl_replay *wr)
   2790 {
   2791 
   2792 	KASSERT(wr->wr_blkhashcnt == 0);
   2793 #ifdef _KERNEL
   2794 	hashdone(wr->wr_blkhash, HASH_LIST, wr->wr_blkhashmask);
   2795 #else /* ! _KERNEL */
   2796 	wapbl_free(wr->wr_blkhash,
   2797 	    (wr->wr_blkhashmask + 1) * sizeof(*wr->wr_blkhash));
   2798 #endif /* ! _KERNEL */
   2799 }
   2800 
   2801 static struct wapbl_blk *
   2802 wapbl_blkhash_get(struct wapbl_replay *wr, daddr_t blk)
   2803 {
   2804 	struct wapbl_blk_head *wbh;
   2805 	struct wapbl_blk *wb;
   2806 
   2807 	wbh = &wr->wr_blkhash[blk & wr->wr_blkhashmask];
   2808 	LIST_FOREACH(wb, wbh, wb_hash) {
   2809 		if (blk == wb->wb_blk)
   2810 			return wb;
   2811 	}
   2812 	return 0;
   2813 }
   2814 
   2815 static void
   2816 wapbl_blkhash_ins(struct wapbl_replay *wr, daddr_t blk, off_t off)
   2817 {
   2818 	struct wapbl_blk_head *wbh;
   2819 	struct wapbl_blk *wb;
   2820 
   2821 	wb = wapbl_blkhash_get(wr, blk);
   2822 	if (wb) {
   2823 		KASSERT(wb->wb_blk == blk);
   2824 		wb->wb_off = off;
   2825 	} else {
   2826 		wb = wapbl_alloc(sizeof(*wb));
   2827 		wb->wb_blk = blk;
   2828 		wb->wb_off = off;
   2829 		wbh = &wr->wr_blkhash[blk & wr->wr_blkhashmask];
   2830 		LIST_INSERT_HEAD(wbh, wb, wb_hash);
   2831 		wr->wr_blkhashcnt++;
   2832 	}
   2833 }
   2834 
   2835 static void
   2836 wapbl_blkhash_rem(struct wapbl_replay *wr, daddr_t blk)
   2837 {
   2838 	struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
   2839 
   2840 	if (wb) {
   2841 		KASSERT(wr->wr_blkhashcnt > 0);
   2842 		wr->wr_blkhashcnt--;
   2843 		LIST_REMOVE(wb, wb_hash);
   2844 		wapbl_free(wb, sizeof(*wb));
   2845 	}
   2846 }
   2847 
   2848 static void
   2849 wapbl_blkhash_clear(struct wapbl_replay *wr)
   2850 {
   2851 	unsigned long i;
   2852 
   2853 	for (i = 0; i <= wr->wr_blkhashmask; i++) {
   2854 		struct wapbl_blk *wb;
   2855 
   2856 		while ((wb = LIST_FIRST(&wr->wr_blkhash[i]))) {
   2857 			KASSERT(wr->wr_blkhashcnt > 0);
   2858 			wr->wr_blkhashcnt--;
   2859 			LIST_REMOVE(wb, wb_hash);
   2860 			wapbl_free(wb, sizeof(*wb));
   2861 		}
   2862 	}
   2863 	KASSERT(wr->wr_blkhashcnt == 0);
   2864 }
   2865 
   2866 /****************************************************************/
   2867 
   2868 /*
   2869  * wapbl_circ_read(wr, data, len, offp)
   2870  *
   2871  *	Read len bytes into data from the circular queue of wr,
   2872  *	starting at the linear byte offset *offp, and returning the new
   2873  *	linear byte offset in *offp.
   2874  *
   2875  *	If the starting linear byte offset precedes wr->wr_circ_off,
   2876  *	the read instead begins at wr->wr_circ_off.  XXX WTF?  This
   2877  *	should be a KASSERT, not a conditional.
   2878  */
   2879 static int
   2880 wapbl_circ_read(struct wapbl_replay *wr, void *data, size_t len, off_t *offp)
   2881 {
   2882 	size_t slen;
   2883 	off_t off = *offp;
   2884 	int error;
   2885 	daddr_t pbn;
   2886 
   2887 	KASSERT(((len >> wr->wr_log_dev_bshift) << wr->wr_log_dev_bshift) ==
   2888 	    len);
   2889 
   2890 	if (off < wr->wr_circ_off)
   2891 		off = wr->wr_circ_off;
   2892 	slen = wr->wr_circ_off + wr->wr_circ_size - off;
   2893 	if (slen < len) {
   2894 		pbn = wr->wr_logpbn + (off >> wr->wr_log_dev_bshift);
   2895 #ifdef _KERNEL
   2896 		pbn = btodb(pbn << wr->wr_log_dev_bshift);
   2897 #endif
   2898 		error = wapbl_read(data, slen, wr->wr_devvp, pbn);
   2899 		if (error)
   2900 			return error;
   2901 		data = (uint8_t *)data + slen;
   2902 		len -= slen;
   2903 		off = wr->wr_circ_off;
   2904 	}
   2905 	pbn = wr->wr_logpbn + (off >> wr->wr_log_dev_bshift);
   2906 #ifdef _KERNEL
   2907 	pbn = btodb(pbn << wr->wr_log_dev_bshift);
   2908 #endif
   2909 	error = wapbl_read(data, len, wr->wr_devvp, pbn);
   2910 	if (error)
   2911 		return error;
   2912 	off += len;
   2913 	if (off >= wr->wr_circ_off + wr->wr_circ_size)
   2914 		off = wr->wr_circ_off;
   2915 	*offp = off;
   2916 	return 0;
   2917 }
   2918 
   2919 /*
   2920  * wapbl_circ_advance(wr, len, offp)
   2921  *
   2922  *	Compute the linear byte offset of the circular queue of wr that
   2923  *	is len bytes past *offp, and store it in *offp.
   2924  *
   2925  *	This is as if wapbl_circ_read, but without actually reading
   2926  *	anything.
   2927  *
   2928  *	If the starting linear byte offset precedes wr->wr_circ_off, it
   2929  *	is taken to be wr->wr_circ_off instead.  XXX WTF?  This should
   2930  *	be a KASSERT, not a conditional.
   2931  */
   2932 static void
   2933 wapbl_circ_advance(struct wapbl_replay *wr, size_t len, off_t *offp)
   2934 {
   2935 	size_t slen;
   2936 	off_t off = *offp;
   2937 
   2938 	KASSERT(((len >> wr->wr_log_dev_bshift) << wr->wr_log_dev_bshift) ==
   2939 	    len);
   2940 
   2941 	if (off < wr->wr_circ_off)
   2942 		off = wr->wr_circ_off;
   2943 	slen = wr->wr_circ_off + wr->wr_circ_size - off;
   2944 	if (slen < len) {
   2945 		len -= slen;
   2946 		off = wr->wr_circ_off;
   2947 	}
   2948 	off += len;
   2949 	if (off >= wr->wr_circ_off + wr->wr_circ_size)
   2950 		off = wr->wr_circ_off;
   2951 	*offp = off;
   2952 }
   2953 
   2954 /****************************************************************/
   2955 
   2956 int
   2957 wapbl_replay_start(struct wapbl_replay **wrp, struct vnode *vp,
   2958     daddr_t off, size_t count, size_t blksize)
   2959 {
   2960 	struct wapbl_replay *wr;
   2961 	int error;
   2962 	struct vnode *devvp;
   2963 	daddr_t logpbn;
   2964 	uint8_t *scratch;
   2965 	struct wapbl_wc_header *wch;
   2966 	struct wapbl_wc_header *wch2;
   2967 	/* Use this until we read the actual log header */
   2968 	int log_dev_bshift = ilog2(blksize);
   2969 	size_t used;
   2970 	daddr_t pbn;
   2971 
   2972 	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
   2973 	    ("wapbl_replay_start: vp=%p off=%"PRId64" count=%zu blksize=%zu\n",
   2974 		vp, off, count, blksize));
   2975 
   2976 	if (off < 0)
   2977 		return EINVAL;
   2978 
   2979 	if (blksize < DEV_BSIZE)
   2980 		return EINVAL;
   2981 	if (blksize % DEV_BSIZE)
   2982 		return EINVAL;
   2983 
   2984 #ifdef _KERNEL
   2985 #if 0
   2986 	/* XXX vp->v_size isn't reliably set for VBLK devices,
   2987 	 * especially root.  However, we might still want to verify
   2988 	 * that the full load is readable */
   2989 	if ((off + count) * blksize > vp->v_size)
   2990 		return EINVAL;
   2991 #endif
   2992 	if ((error = VOP_BMAP(vp, off, &devvp, &logpbn, 0)) != 0) {
   2993 		return error;
   2994 	}
   2995 #else /* ! _KERNEL */
   2996 	devvp = vp;
   2997 	logpbn = off;
   2998 #endif /* ! _KERNEL */
   2999 
   3000 	scratch = wapbl_alloc(MAXBSIZE);
   3001 
   3002 	pbn = logpbn;
   3003 #ifdef _KERNEL
   3004 	pbn = btodb(pbn << log_dev_bshift);
   3005 #endif
   3006 	error = wapbl_read(scratch, 2<<log_dev_bshift, devvp, pbn);
   3007 	if (error)
   3008 		goto errout;
   3009 
   3010 	wch = (struct wapbl_wc_header *)scratch;
   3011 	wch2 =
   3012 	    (struct wapbl_wc_header *)(scratch + (1<<log_dev_bshift));
   3013 	/* XXX verify checksums and magic numbers */
   3014 	if (wch->wc_type != WAPBL_WC_HEADER) {
   3015 		printf("Unrecognized wapbl magic: 0x%08x\n", wch->wc_type);
   3016 		error = EFTYPE;
   3017 		goto errout;
   3018 	}
   3019 
   3020 	if (wch2->wc_generation > wch->wc_generation)
   3021 		wch = wch2;
   3022 
   3023 	wr = wapbl_calloc(1, sizeof(*wr));
   3024 
   3025 	wr->wr_logvp = vp;
   3026 	wr->wr_devvp = devvp;
   3027 	wr->wr_logpbn = logpbn;
   3028 
   3029 	wr->wr_scratch = scratch;
   3030 
   3031 	wr->wr_log_dev_bshift = wch->wc_log_dev_bshift;
   3032 	wr->wr_fs_dev_bshift = wch->wc_fs_dev_bshift;
   3033 	wr->wr_circ_off = wch->wc_circ_off;
   3034 	wr->wr_circ_size = wch->wc_circ_size;
   3035 	wr->wr_generation = wch->wc_generation;
   3036 
   3037 	used = wapbl_space_used(wch->wc_circ_size, wch->wc_head, wch->wc_tail);
   3038 
   3039 	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
   3040 	    ("wapbl_replay: head=%"PRId64" tail=%"PRId64" off=%"PRId64
   3041 		" len=%"PRId64" used=%zu\n",
   3042 		wch->wc_head, wch->wc_tail, wch->wc_circ_off,
   3043 		wch->wc_circ_size, used));
   3044 
   3045 	wapbl_blkhash_init(wr, (used >> wch->wc_fs_dev_bshift));
   3046 
   3047 	error = wapbl_replay_process(wr, wch->wc_head, wch->wc_tail);
   3048 	if (error) {
   3049 		wapbl_replay_stop(wr);
   3050 		wapbl_replay_free(wr);
   3051 		return error;
   3052 	}
   3053 
   3054 	*wrp = wr;
   3055 	return 0;
   3056 
   3057 errout:
   3058 	wapbl_free(scratch, MAXBSIZE);
   3059 	return error;
   3060 }
   3061 
   3062 void
   3063 wapbl_replay_stop(struct wapbl_replay *wr)
   3064 {
   3065 
   3066 	if (!wapbl_replay_isopen(wr))
   3067 		return;
   3068 
   3069 	WAPBL_PRINTF(WAPBL_PRINT_REPLAY, ("wapbl_replay_stop called\n"));
   3070 
   3071 	wapbl_free(wr->wr_scratch, MAXBSIZE);
   3072 	wr->wr_scratch = NULL;
   3073 
   3074 	wr->wr_logvp = NULL;
   3075 
   3076 	wapbl_blkhash_clear(wr);
   3077 	wapbl_blkhash_free(wr);
   3078 }
   3079 
   3080 void
   3081 wapbl_replay_free(struct wapbl_replay *wr)
   3082 {
   3083 
   3084 	KDASSERT(!wapbl_replay_isopen(wr));
   3085 
   3086 	if (wr->wr_inodes) {
   3087 		wapbl_free(wr->wr_inodes,
   3088 		    wr->wr_inodescnt * sizeof(wr->wr_inodes[0]));
   3089 	}
   3090 	wapbl_free(wr, sizeof(*wr));
   3091 }
   3092 
   3093 #ifdef _KERNEL
   3094 int
   3095 wapbl_replay_isopen1(struct wapbl_replay *wr)
   3096 {
   3097 
   3098 	return wapbl_replay_isopen(wr);
   3099 }
   3100 #endif
   3101 
   3102 /*
   3103  * calculate the disk address for the i'th block in the wc_blockblist
   3104  * offset by j blocks of size blen.
   3105  *
   3106  * wc_daddr is always a kernel disk address in DEV_BSIZE units that
   3107  * was written to the journal.
   3108  *
   3109  * The kernel needs that address plus the offset in DEV_BSIZE units.
   3110  *
   3111  * Userland needs that address plus the offset in blen units.
   3112  *
   3113  */
   3114 static daddr_t
   3115 wapbl_block_daddr(struct wapbl_wc_blocklist *wc, int i, int j, int blen)
   3116 {
   3117 	daddr_t pbn;
   3118 
   3119 #ifdef _KERNEL
   3120 	pbn = wc->wc_blocks[i].wc_daddr + btodb(j * blen);
   3121 #else
   3122 	pbn = dbtob(wc->wc_blocks[i].wc_daddr) / blen + j;
   3123 #endif
   3124 
   3125 	return pbn;
   3126 }
   3127 
   3128 static void
   3129 wapbl_replay_process_blocks(struct wapbl_replay *wr, off_t *offp)
   3130 {
   3131 	struct wapbl_wc_blocklist *wc =
   3132 	    (struct wapbl_wc_blocklist *)wr->wr_scratch;
   3133 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
   3134 	int i, j, n;
   3135 
   3136 	for (i = 0; i < wc->wc_blkcount; i++) {
   3137 		/*
   3138 		 * Enter each physical block into the hashtable independently.
   3139 		 */
   3140 		n = wc->wc_blocks[i].wc_dlen >> wr->wr_fs_dev_bshift;
   3141 		for (j = 0; j < n; j++) {
   3142 			wapbl_blkhash_ins(wr,
   3143 			    wapbl_block_daddr(wc, i, j, fsblklen),
   3144 			    *offp);
   3145 			wapbl_circ_advance(wr, fsblklen, offp);
   3146 		}
   3147 	}
   3148 }
   3149 
   3150 static void
   3151 wapbl_replay_process_revocations(struct wapbl_replay *wr)
   3152 {
   3153 	struct wapbl_wc_blocklist *wc =
   3154 	    (struct wapbl_wc_blocklist *)wr->wr_scratch;
   3155 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
   3156 	int i, j, n;
   3157 
   3158 	for (i = 0; i < wc->wc_blkcount; i++) {
   3159 		/*
   3160 		 * Remove any blocks found from the hashtable.
   3161 		 */
   3162 		n = wc->wc_blocks[i].wc_dlen >> wr->wr_fs_dev_bshift;
   3163 		for (j = 0; j < n; j++) {
   3164 			wapbl_blkhash_rem(wr, wapbl_block_daddr(wc, i, j,
   3165 				fsblklen));
   3166 		}
   3167 	}
   3168 }
   3169 
   3170 static void
   3171 wapbl_replay_process_inodes(struct wapbl_replay *wr, off_t oldoff,
   3172     off_t newoff)
   3173 {
   3174 	struct wapbl_wc_inodelist *wc =
   3175 	    (struct wapbl_wc_inodelist *)wr->wr_scratch;
   3176 	void *new_inodes;
   3177 	const size_t oldsize = wr->wr_inodescnt * sizeof(wr->wr_inodes[0]);
   3178 
   3179 	KASSERT(sizeof(wr->wr_inodes[0]) == sizeof(wc->wc_inodes[0]));
   3180 
   3181 	/*
   3182 	 * Keep track of where we found this so location won't be
   3183 	 * overwritten.
   3184 	 */
   3185 	if (wc->wc_clear) {
   3186 		wr->wr_inodestail = oldoff;
   3187 		wr->wr_inodescnt = 0;
   3188 		if (wr->wr_inodes != NULL) {
   3189 			wapbl_free(wr->wr_inodes, oldsize);
   3190 			wr->wr_inodes = NULL;
   3191 		}
   3192 	}
   3193 	wr->wr_inodeshead = newoff;
   3194 	if (wc->wc_inocnt == 0)
   3195 		return;
   3196 
   3197 	new_inodes = wapbl_alloc((wr->wr_inodescnt + wc->wc_inocnt) *
   3198 	    sizeof(wr->wr_inodes[0]));
   3199 	if (wr->wr_inodes != NULL) {
   3200 		memcpy(new_inodes, wr->wr_inodes, oldsize);
   3201 		wapbl_free(wr->wr_inodes, oldsize);
   3202 	}
   3203 	wr->wr_inodes = new_inodes;
   3204 	memcpy(&wr->wr_inodes[wr->wr_inodescnt], wc->wc_inodes,
   3205 	    wc->wc_inocnt * sizeof(wr->wr_inodes[0]));
   3206 	wr->wr_inodescnt += wc->wc_inocnt;
   3207 }
   3208 
   3209 static int
   3210 wapbl_replay_process(struct wapbl_replay *wr, off_t head, off_t tail)
   3211 {
   3212 	off_t off;
   3213 	int error;
   3214 
   3215 	int logblklen = 1 << wr->wr_log_dev_bshift;
   3216 
   3217 	wapbl_blkhash_clear(wr);
   3218 
   3219 	off = tail;
   3220 	while (off != head) {
   3221 		struct wapbl_wc_null *wcn;
   3222 		off_t saveoff = off;
   3223 		error = wapbl_circ_read(wr, wr->wr_scratch, logblklen, &off);
   3224 		if (error)
   3225 			goto errout;
   3226 		wcn = (struct wapbl_wc_null *)wr->wr_scratch;
   3227 		switch (wcn->wc_type) {
   3228 		case WAPBL_WC_BLOCKS:
   3229 			wapbl_replay_process_blocks(wr, &off);
   3230 			break;
   3231 
   3232 		case WAPBL_WC_REVOCATIONS:
   3233 			wapbl_replay_process_revocations(wr);
   3234 			break;
   3235 
   3236 		case WAPBL_WC_INODES:
   3237 			wapbl_replay_process_inodes(wr, saveoff, off);
   3238 			break;
   3239 
   3240 		default:
   3241 			printf("Unrecognized wapbl type: 0x%08x\n",
   3242 			    wcn->wc_type);
   3243 			error = EFTYPE;
   3244 			goto errout;
   3245 		}
   3246 		wapbl_circ_advance(wr, wcn->wc_len, &saveoff);
   3247 		if (off != saveoff) {
   3248 			printf("wapbl_replay: corrupted records\n");
   3249 			error = EFTYPE;
   3250 			goto errout;
   3251 		}
   3252 	}
   3253 	return 0;
   3254 
   3255 errout:
   3256 	wapbl_blkhash_clear(wr);
   3257 	return error;
   3258 }
   3259 
   3260 #if 0
   3261 int
   3262 wapbl_replay_verify(struct wapbl_replay *wr, struct vnode *fsdevvp)
   3263 {
   3264 	off_t off;
   3265 	int mismatchcnt = 0;
   3266 	int logblklen = 1 << wr->wr_log_dev_bshift;
   3267 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
   3268 	void *scratch1 = wapbl_alloc(MAXBSIZE);
   3269 	void *scratch2 = wapbl_alloc(MAXBSIZE);
   3270 	int error = 0;
   3271 
   3272 	KDASSERT(wapbl_replay_isopen(wr));
   3273 
   3274 	off = wch->wc_tail;
   3275 	while (off != wch->wc_head) {
   3276 		struct wapbl_wc_null *wcn;
   3277 #ifdef DEBUG
   3278 		off_t saveoff = off;
   3279 #endif
   3280 		error = wapbl_circ_read(wr, wr->wr_scratch, logblklen, &off);
   3281 		if (error)
   3282 			goto out;
   3283 		wcn = (struct wapbl_wc_null *)wr->wr_scratch;
   3284 		switch (wcn->wc_type) {
   3285 		case WAPBL_WC_BLOCKS: {
   3286 			struct wapbl_wc_blocklist *wc =
   3287 			    (struct wapbl_wc_blocklist *)wr->wr_scratch;
   3288 			int i;
   3289 			for (i = 0; i < wc->wc_blkcount; i++) {
   3290 				int foundcnt = 0;
   3291 				int dirtycnt = 0;
   3292 				int j, n;
   3293 				/*
   3294 				 * Check each physical block into the
   3295 				 * hashtable independently
   3296 				 */
   3297 				n = wc->wc_blocks[i].wc_dlen >>
   3298 				    wch->wc_fs_dev_bshift;
   3299 				for (j = 0; j < n; j++) {
   3300 					struct wapbl_blk *wb =
   3301 					    wapbl_blkhash_get(wr,
   3302 						wapbl_block_daddr(wc, i, j,
   3303 						    fsblklen));
   3304 					if (wb && wb->wb_off == off) {
   3305 						foundcnt++;
   3306 						error =
   3307 						    wapbl_circ_read(wr,
   3308 							scratch1, fsblklen,
   3309 							&off);
   3310 						if (error)
   3311 							goto out;
   3312 						error =
   3313 						    wapbl_read(scratch2,
   3314 							fsblklen, fsdevvp,
   3315 							wb->wb_blk);
   3316 						if (error)
   3317 							goto out;
   3318 						if (memcmp(scratch1,
   3319 							scratch2,
   3320 							fsblklen)) {
   3321 							printf("wapbl_verify:"
   3322 							    " mismatch block"
   3323 							    " %"PRId64
   3324 							    " at off"
   3325 							    " %"PRIdMAX"\n",
   3326 							    wb->wb_blk,
   3327 							    (intmax_t)off);
   3328 							dirtycnt++;
   3329 							mismatchcnt++;
   3330 						}
   3331 					} else {
   3332 						wapbl_circ_advance(wr,
   3333 						    fsblklen, &off);
   3334 					}
   3335 				}
   3336 #if 0
   3337 				/*
   3338 				 * If all of the blocks in an entry
   3339 				 * are clean, then remove all of its
   3340 				 * blocks from the hashtable since they
   3341 				 * never will need replay.
   3342 				 */
   3343 				if (foundcnt != 0 && dirtycnt == 0) {
   3344 					off = saveoff;
   3345 					wapbl_circ_advance(wr, logblklen,
   3346 					    &off);
   3347 					for (j = 0; j < n; j++) {
   3348 						struct wapbl_blk *wb =
   3349 						    wapbl_blkhash_get(wr,
   3350 							wapbl_block_daddr(wc,
   3351 							    i, j, fsblklen));
   3352 						if (wb &&
   3353 						    (wb->wb_off == off)) {
   3354 							wapbl_blkhash_rem(wr,
   3355 							    wb->wb_blk);
   3356 						}
   3357 						wapbl_circ_advance(wr,
   3358 						    fsblklen, &off);
   3359 					}
   3360 				}
   3361 #endif
   3362 			}
   3363 		}
   3364 			break;
   3365 		case WAPBL_WC_REVOCATIONS:
   3366 		case WAPBL_WC_INODES:
   3367 			break;
   3368 		default:
   3369 			KASSERT(0);
   3370 		}
   3371 #ifdef DEBUG
   3372 		wapbl_circ_advance(wr, wcn->wc_len, &saveoff);
   3373 		KASSERT(off == saveoff);
   3374 #endif
   3375 	}
   3376 out:
   3377 	wapbl_free(scratch1, MAXBSIZE);
   3378 	wapbl_free(scratch2, MAXBSIZE);
   3379 	if (!error && mismatchcnt)
   3380 		error = EFTYPE;
   3381 	return error;
   3382 }
   3383 #endif
   3384 
   3385 int
   3386 wapbl_replay_write(struct wapbl_replay *wr, struct vnode *fsdevvp)
   3387 {
   3388 	struct wapbl_blk *wb;
   3389 	size_t i;
   3390 	off_t off;
   3391 	void *scratch;
   3392 	int error = 0;
   3393 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
   3394 
   3395 	KDASSERT(wapbl_replay_isopen(wr));
   3396 
   3397 	scratch = wapbl_alloc(MAXBSIZE);
   3398 
   3399 	for (i = 0; i <= wr->wr_blkhashmask; ++i) {
   3400 		LIST_FOREACH(wb, &wr->wr_blkhash[i], wb_hash) {
   3401 			off = wb->wb_off;
   3402 			error = wapbl_circ_read(wr, scratch, fsblklen, &off);
   3403 			if (error)
   3404 				break;
   3405 			error = wapbl_write(scratch, fsblklen, fsdevvp,
   3406 			    wb->wb_blk);
   3407 			if (error)
   3408 				break;
   3409 		}
   3410 	}
   3411 
   3412 	wapbl_free(scratch, MAXBSIZE);
   3413 	return error;
   3414 }
   3415 
   3416 int
   3417 wapbl_replay_can_read(struct wapbl_replay *wr, daddr_t blk, long len)
   3418 {
   3419 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
   3420 
   3421 	KDASSERT(wapbl_replay_isopen(wr));
   3422 	KASSERT((len % fsblklen) == 0);
   3423 
   3424 	while (len != 0) {
   3425 		struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
   3426 		if (wb)
   3427 			return 1;
   3428 		len -= fsblklen;
   3429 	}
   3430 	return 0;
   3431 }
   3432 
   3433 int
   3434 wapbl_replay_read(struct wapbl_replay *wr, void *data, daddr_t blk, long len)
   3435 {
   3436 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
   3437 
   3438 	KDASSERT(wapbl_replay_isopen(wr));
   3439 
   3440 	KASSERT((len % fsblklen) == 0);
   3441 
   3442 	while (len != 0) {
   3443 		struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
   3444 		if (wb) {
   3445 			off_t off = wb->wb_off;
   3446 			int error;
   3447 			error = wapbl_circ_read(wr, data, fsblklen, &off);
   3448 			if (error)
   3449 				return error;
   3450 		}
   3451 		data = (uint8_t *)data + fsblklen;
   3452 		len -= fsblklen;
   3453 		blk++;
   3454 	}
   3455 	return 0;
   3456 }
   3457 
   3458 #ifdef _KERNEL
   3459 
   3460 MODULE(MODULE_CLASS_VFS, wapbl, NULL);
   3461 
   3462 static int
   3463 wapbl_modcmd(modcmd_t cmd, void *arg)
   3464 {
   3465 
   3466 	switch (cmd) {
   3467 	case MODULE_CMD_INIT:
   3468 		wapbl_init();
   3469 		return 0;
   3470 	case MODULE_CMD_FINI:
   3471 		return wapbl_fini();
   3472 	default:
   3473 		return ENOTTY;
   3474 	}
   3475 }
   3476 
   3477 #endif /* _KERNEL */
   3478