Home | History | Annotate | Line # | Download | only in programs
      1 /*
      2  * Copyright (c) Meta Platforms, Inc. and affiliates.
      3  * All rights reserved.
      4  *
      5  * This source code is licensed under both the BSD-style license (found in the
      6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
      7  * in the COPYING file in the root directory of this source tree).
      8  * You may select, at your option, one of the above-listed licenses.
      9  */
     10 
     11  /*
     12   * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
     13   * Current implementation relies on having one thread that reads and one that
     14   * writes.
     15   * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
     16   * are performed serially by the appropriate worker thread.
     17   * Most systems exposes better primitives to perform asynchronous IO, such as
     18   * io_uring on newer linux systems. The API is built in such a way that in the
     19   * future we could replace the threads with better solutions when available.
     20   */
     21 
     22 #ifndef ZSTD_FILEIO_ASYNCIO_H
     23 #define ZSTD_FILEIO_ASYNCIO_H
     24 
     25 #if defined (__cplusplus)
     26 extern "C" {
     27 #endif
     28 
     29 #include "../lib/common/mem.h"     /* U32, U64 */
     30 #include "fileio_types.h"
     31 #include "platform.h"
     32 #include "util.h"
     33 #include "../lib/common/pool.h"
     34 #include "../lib/common/threading.h"
     35 
     36 #define MAX_IO_JOBS          (10)
     37 
     38 typedef struct {
     39     /* These struct fields should be set only on creation and not changed afterwards */
     40     POOL_ctx* threadPool;
     41     int threadPoolActive;
     42     int totalIoJobs;
     43     const FIO_prefs_t* prefs;
     44     POOL_function poolFunction;
     45 
     46     /* Controls the file we currently write to, make changes only by using provided utility functions */
     47     FILE* file;
     48 
     49     /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
     50      * only be mutated after locking the mutex */
     51     ZSTD_pthread_mutex_t ioJobsMutex;
     52     void* availableJobs[MAX_IO_JOBS];
     53     int availableJobsCount;
     54     size_t jobBufferSize;
     55 } IOPoolCtx_t;
     56 
     57 typedef struct {
     58     IOPoolCtx_t base;
     59 
     60     /* State regarding the currently read file */
     61     int reachedEof;
     62     U64 nextReadOffset;
     63     U64 waitingOnOffset;
     64 
     65     /* We may hold an IOJob object as needed if we actively expose its buffer. */
     66     void *currentJobHeld;
     67 
     68     /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
     69      * the first of them. Shouldn't be accessed from outside ot utility functions. */
     70     U8 *coalesceBuffer;
     71 
     72     /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
     73      * change when consuming / refilling buffer. */
     74     U8 *srcBuffer;
     75     size_t srcBufferLoaded;
     76 
     77     /* We need to know what tasks completed so we can use their buffers when their time comes.
     78      * Should only be accessed after locking base.ioJobsMutex . */
     79     void* completedJobs[MAX_IO_JOBS];
     80     int completedJobsCount;
     81     ZSTD_pthread_cond_t jobCompletedCond;
     82 } ReadPoolCtx_t;
     83 
     84 typedef struct {
     85     IOPoolCtx_t base;
     86     unsigned storedSkips;
     87 } WritePoolCtx_t;
     88 
     89 typedef struct {
     90     /* These fields are automatically set and shouldn't be changed by non WritePool code. */
     91     void *ctx;
     92     FILE* file;
     93     void *buffer;
     94     size_t bufferSize;
     95 
     96     /* This field should be changed before a job is queued for execution and should contain the number
     97      * of bytes to write from the buffer. */
     98     size_t usedBufferSize;
     99     U64 offset;
    100 } IOJob_t;
    101 
    102 /* AIO_supported:
    103  * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
    104 int AIO_supported(void);
    105 
    106 
    107 /* AIO_WritePool_releaseIoJob:
    108  * Releases an acquired job back to the pool. Doesn't execute the job. */
    109 void AIO_WritePool_releaseIoJob(IOJob_t *job);
    110 
    111 /* AIO_WritePool_acquireJob:
    112  * Returns an available write job to be used for a future write. */
    113 IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
    114 
    115 /* AIO_WritePool_enqueueAndReacquireWriteJob:
    116  * Enqueues a write job for execution and acquires a new one.
    117  * After execution `job`'s pointed value would change to the newly acquired job.
    118  * Make sure to set `usedBufferSize` to the wanted length before call.
    119  * The queued job shouldn't be used directly after queueing it. */
    120 void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
    121 
    122 /* AIO_WritePool_sparseWriteEnd:
    123  * Ends sparse writes to the current file.
    124  * Blocks on completion of all current write jobs before executing. */
    125 void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
    126 
    127 /* AIO_WritePool_setFile:
    128  * Sets the destination file for future writes in the pool.
    129  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
    130  * Also requires ending of sparse write if a previous file was used in sparse mode. */
    131 void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
    132 
    133 /* AIO_WritePool_getFile:
    134  * Returns the file the writePool is currently set to write to. */
    135 FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
    136 
    137 /* AIO_WritePool_closeFile:
    138  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
    139  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
    140 int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
    141 
    142 /* AIO_WritePool_create:
    143  * Allocates and sets and a new write pool including its included jobs.
    144  * bufferSize should be set to the maximal buffer we want to write to at a time. */
    145 WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
    146 
    147 /* AIO_WritePool_free:
    148  * Frees and releases a writePool and its resources. Closes destination file. */
    149 void AIO_WritePool_free(WritePoolCtx_t* ctx);
    150 
    151 /* AIO_WritePool_setAsync:
    152  * Allows (de)activating async mode, to be used when the expected overhead
    153  * of asyncio costs more than the expected gains. */
    154 void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
    155 
    156 /* AIO_ReadPool_create:
    157  * Allocates and sets and a new readPool including its included jobs.
    158  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
    159  * as our basic read size. */
    160 ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
    161 
    162 /* AIO_ReadPool_free:
    163  * Frees and releases a readPool and its resources. Closes source file. */
    164 void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
    165 
    166 /* AIO_ReadPool_setAsync:
    167  * Allows (de)activating async mode, to be used when the expected overhead
    168  * of asyncio costs more than the expected gains. */
    169 void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
    170 
    171 /* AIO_ReadPool_consumeBytes:
    172  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
    173 void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
    174 
    175 /* AIO_ReadPool_fillBuffer:
    176  * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize).
    177  * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
    178  * Return value is the number of bytes added to the buffer.
    179  * Note that srcBuffer might have up to 2 times bufferSize bytes. */
    180 size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
    181 
    182 /* AIO_ReadPool_consumeAndRefill:
    183  * Consumes the current buffer and refills it with bufferSize bytes. */
    184 size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
    185 
    186 /* AIO_ReadPool_setFile:
    187  * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
    188  * Waits for all current enqueued tasks to complete if a previous file was set. */
    189 void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
    190 
    191 /* AIO_ReadPool_getFile:
    192  * Returns the current file set for the read pool. */
    193 FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
    194 
    195 /* AIO_ReadPool_closeFile:
    196  * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
    197 int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
    198 
    199 #if defined (__cplusplus)
    200 }
    201 #endif
    202 
    203 #endif /* ZSTD_FILEIO_ASYNCIO_H */
    204