Home | History | Annotate | Line # | Download | only in programs
      1 /*
      2  * Copyright (c) Meta Platforms, Inc. and affiliates.
      3  * All rights reserved.
      4  *
      5  * This source code is licensed under both the BSD-style license (found in the
      6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
      7  * in the COPYING file in the root directory of this source tree).
      8  * You may select, at your option, one of the above-listed licenses.
      9  */
     10 
     11 #include "platform.h"
     12 #include <stdio.h>      /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
     13 #include <stdlib.h>     /* malloc, free */
     14 #include <assert.h>
     15 #include <errno.h>      /* errno */
     16 
     17 #if defined (_MSC_VER)
     18 #  include <sys/stat.h>
     19 #  include <io.h>
     20 #endif
     21 
     22 #include "fileio_asyncio.h"
     23 #include "fileio_common.h"
     24 
     25 /* **********************************************************************
     26  *  Sparse write
     27  ************************************************************************/
     28 
     29 /** AIO_fwriteSparse() :
     30 *  @return : storedSkips,
     31 *            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
     32 static unsigned
     33 AIO_fwriteSparse(FILE* file,
     34                  const void* buffer, size_t bufferSize,
     35                  const FIO_prefs_t* const prefs,
     36                  unsigned storedSkips)
     37 {
     38     const size_t* const bufferT = (const size_t*)buffer;   /* Buffer is supposed malloc'ed, hence aligned on size_t */
     39     size_t bufferSizeT = bufferSize / sizeof(size_t);
     40     const size_t* const bufferTEnd = bufferT + bufferSizeT;
     41     const size_t* ptrT = bufferT;
     42     static const size_t segmentSizeT = (32 KB) / sizeof(size_t);   /* check every 32 KB */
     43 
     44     if (prefs->testMode) return 0;  /* do not output anything in test mode */
     45 
     46     if (!prefs->sparseFileSupport) {  /* normal write */
     47         size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
     48         if (sizeCheck != bufferSize)
     49             EXM_THROW(70, "Write error : cannot write block : %s",
     50                       strerror(errno));
     51         return 0;
     52     }
     53 
     54     /* avoid int overflow */
     55     if (storedSkips > 1 GB) {
     56         if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
     57         EXM_THROW(91, "1 GB skip error (sparse file support)");
     58         storedSkips -= 1 GB;
     59     }
     60 
     61     while (ptrT < bufferTEnd) {
     62         size_t nb0T;
     63 
     64         /* adjust last segment if < 32 KB */
     65         size_t seg0SizeT = segmentSizeT;
     66         if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
     67         bufferSizeT -= seg0SizeT;
     68 
     69         /* count leading zeroes */
     70         for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
     71         storedSkips += (unsigned)(nb0T * sizeof(size_t));
     72 
     73         if (nb0T != seg0SizeT) {   /* not all 0s */
     74             size_t const nbNon0ST = seg0SizeT - nb0T;
     75             /* skip leading zeros */
     76             if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
     77                 EXM_THROW(92, "Sparse skip error ; try --no-sparse");
     78             storedSkips = 0;
     79             /* write the rest */
     80             if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
     81                 EXM_THROW(93, "Write error : cannot write block : %s",
     82                           strerror(errno));
     83         }
     84         ptrT += seg0SizeT;
     85     }
     86 
     87     {   static size_t const maskT = sizeof(size_t)-1;
     88         if (bufferSize & maskT) {
     89             /* size not multiple of sizeof(size_t) : implies end of block */
     90             const char* const restStart = (const char*)bufferTEnd;
     91             const char* restPtr = restStart;
     92             const char* const restEnd = (const char*)buffer + bufferSize;
     93             assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
     94             for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
     95             storedSkips += (unsigned) (restPtr - restStart);
     96             if (restPtr != restEnd) {
     97                 /* not all remaining bytes are 0 */
     98                 size_t const restSize = (size_t)(restEnd - restPtr);
     99                 if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
    100                     EXM_THROW(92, "Sparse skip error ; try --no-sparse");
    101                 if (fwrite(restPtr, 1, restSize, file) != restSize)
    102                     EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
    103                               strerror(errno));
    104                 storedSkips = 0;
    105             }   }   }
    106 
    107     return storedSkips;
    108 }
    109 
    110 static void
    111 AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
    112 {
    113     if (prefs->testMode) assert(storedSkips == 0);
    114     if (storedSkips>0) {
    115         assert(prefs->sparseFileSupport > 0);  /* storedSkips>0 implies sparse support is enabled */
    116         (void)prefs;   /* assert can be disabled, in which case prefs becomes unused */
    117         if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
    118             EXM_THROW(69, "Final skip error (sparse file support)");
    119         /* last zero must be explicitly written,
    120          * so that skipped ones get implicitly translated as zero by FS */
    121         {   const char lastZeroByte[1] = { 0 };
    122             if (fwrite(lastZeroByte, 1, 1, file) != 1)
    123                 EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
    124         }   }
    125 }
    126 
    127 
    128 /* **********************************************************************
    129  *  AsyncIO functionality
    130  ************************************************************************/
    131 
    132 /* AIO_supported:
    133  * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
    134 int AIO_supported(void) {
    135 #ifdef ZSTD_MULTITHREAD
    136     return 1;
    137 #else
    138     return 0;
    139 #endif
    140 }
    141 
    142 /* ***********************************
    143  *  Generic IoPool implementation
    144  *************************************/
    145 
    146 static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
    147     IOJob_t* const job  = (IOJob_t*) malloc(sizeof(IOJob_t));
    148     void* const buffer = malloc(bufferSize);
    149     if(!job || !buffer)
    150         EXM_THROW(101, "Allocation error : not enough memory");
    151     job->buffer = buffer;
    152     job->bufferSize = bufferSize;
    153     job->usedBufferSize = 0;
    154     job->file = NULL;
    155     job->ctx = ctx;
    156     job->offset = 0;
    157     return job;
    158 }
    159 
    160 
    161 /* AIO_IOPool_createThreadPool:
    162  * Creates a thread pool and a mutex for threaded IO pool.
    163  * Displays warning if asyncio is requested but MT isn't available. */
    164 static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
    165     ctx->threadPool = NULL;
    166     ctx->threadPoolActive = 0;
    167     if(prefs->asyncIO) {
    168         if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
    169             EXM_THROW(102,"Failed creating ioJobsMutex mutex");
    170         /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
    171          * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
    172         assert(MAX_IO_JOBS >= 2);
    173         ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
    174         ctx->threadPoolActive = 1;
    175         if (!ctx->threadPool)
    176             EXM_THROW(104, "Failed creating I/O thread pool");
    177     }
    178 }
    179 
    180 /* AIO_IOPool_init:
    181  * Allocates and sets and a new I/O thread pool including its included availableJobs. */
    182 static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
    183     int i;
    184     AIO_IOPool_createThreadPool(ctx, prefs);
    185     ctx->prefs = prefs;
    186     ctx->poolFunction = poolFunction;
    187     ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
    188     ctx->availableJobsCount = ctx->totalIoJobs;
    189     for(i=0; i < ctx->availableJobsCount; i++) {
    190         ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
    191     }
    192     ctx->jobBufferSize = bufferSize;
    193     ctx->file = NULL;
    194 }
    195 
    196 
    197 /* AIO_IOPool_threadPoolActive:
    198  * Check if current operation uses thread pool.
    199  * Note that in some cases we have a thread pool initialized but choose not to use it. */
    200 static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
    201     return ctx->threadPool && ctx->threadPoolActive;
    202 }
    203 
    204 
    205 /* AIO_IOPool_lockJobsMutex:
    206  * Locks the IO jobs mutex if threading is active */
    207 static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
    208     if(AIO_IOPool_threadPoolActive(ctx))
    209         ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
    210 }
    211 
    212 /* AIO_IOPool_unlockJobsMutex:
    213  * Unlocks the IO jobs mutex if threading is active */
    214 static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
    215     if(AIO_IOPool_threadPoolActive(ctx))
    216         ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
    217 }
    218 
    219 /* AIO_IOPool_releaseIoJob:
    220  * Releases an acquired job back to the pool. Doesn't execute the job. */
    221 static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
    222     IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
    223     AIO_IOPool_lockJobsMutex(ctx);
    224     assert(ctx->availableJobsCount < ctx->totalIoJobs);
    225     ctx->availableJobs[ctx->availableJobsCount++] = job;
    226     AIO_IOPool_unlockJobsMutex(ctx);
    227 }
    228 
    229 /* AIO_IOPool_join:
    230  * Waits for all tasks in the pool to finish executing. */
    231 static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
    232     if(AIO_IOPool_threadPoolActive(ctx))
    233         POOL_joinJobs(ctx->threadPool);
    234 }
    235 
    236 /* AIO_IOPool_setThreaded:
    237  * Allows (de)activating threaded mode, to be used when the expected overhead
    238  * of threading costs more than the expected gains. */
    239 static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
    240     assert(threaded == 0 || threaded == 1);
    241     assert(ctx != NULL);
    242     if(ctx->threadPoolActive != threaded) {
    243         AIO_IOPool_join(ctx);
    244         ctx->threadPoolActive = threaded;
    245     }
    246 }
    247 
    248 /* AIO_IOPool_free:
    249  * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
    250 static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
    251     int i;
    252     if(ctx->threadPool) {
    253         /* Make sure we finish all tasks and then free the resources */
    254         AIO_IOPool_join(ctx);
    255         /* Make sure we are not leaking availableJobs */
    256         assert(ctx->availableJobsCount == ctx->totalIoJobs);
    257         POOL_free(ctx->threadPool);
    258         ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
    259     }
    260     assert(ctx->file == NULL);
    261     for(i=0; i<ctx->availableJobsCount; i++) {
    262         IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
    263         free(job->buffer);
    264         free(job);
    265     }
    266 }
    267 
    268 /* AIO_IOPool_acquireJob:
    269  * Returns an available io job to be used for a future io. */
    270 static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
    271     IOJob_t *job;
    272     assert(ctx->file != NULL || ctx->prefs->testMode);
    273     AIO_IOPool_lockJobsMutex(ctx);
    274     assert(ctx->availableJobsCount > 0);
    275     job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
    276     AIO_IOPool_unlockJobsMutex(ctx);
    277     job->usedBufferSize = 0;
    278     job->file = ctx->file;
    279     job->offset = 0;
    280     return job;
    281 }
    282 
    283 
    284 /* AIO_IOPool_setFile:
    285  * Sets the destination file for future files in the pool.
    286  * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
    287 static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
    288     assert(ctx!=NULL);
    289     AIO_IOPool_join(ctx);
    290     assert(ctx->availableJobsCount == ctx->totalIoJobs);
    291     ctx->file = file;
    292 }
    293 
    294 static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
    295     return ctx->file;
    296 }
    297 
    298 /* AIO_IOPool_enqueueJob:
    299  * Enqueues an io job for execution.
    300  * The queued job shouldn't be used directly after queueing it. */
    301 static void AIO_IOPool_enqueueJob(IOJob_t* job) {
    302     IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
    303     if(AIO_IOPool_threadPoolActive(ctx))
    304         POOL_add(ctx->threadPool, ctx->poolFunction, job);
    305     else
    306         ctx->poolFunction(job);
    307 }
    308 
    309 /* ***********************************
    310  *  WritePool implementation
    311  *************************************/
    312 
    313 /* AIO_WritePool_acquireJob:
    314  * Returns an available write job to be used for a future write. */
    315 IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
    316     return AIO_IOPool_acquireJob(&ctx->base);
    317 }
    318 
    319 /* AIO_WritePool_enqueueAndReacquireWriteJob:
    320  * Queues a write job for execution and acquires a new one.
    321  * After execution `job`'s pointed value would change to the newly acquired job.
    322  * Make sure to set `usedBufferSize` to the wanted length before call.
    323  * The queued job shouldn't be used directly after queueing it. */
    324 void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
    325     AIO_IOPool_enqueueJob(*job);
    326     *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
    327 }
    328 
    329 /* AIO_WritePool_sparseWriteEnd:
    330  * Ends sparse writes to the current file.
    331  * Blocks on completion of all current write jobs before executing. */
    332 void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
    333     assert(ctx != NULL);
    334     AIO_IOPool_join(&ctx->base);
    335     AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
    336     ctx->storedSkips = 0;
    337 }
    338 
    339 /* AIO_WritePool_setFile:
    340  * Sets the destination file for future writes in the pool.
    341  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
    342  * Also requires ending of sparse write if a previous file was used in sparse mode. */
    343 void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
    344     AIO_IOPool_setFile(&ctx->base, file);
    345     assert(ctx->storedSkips == 0);
    346 }
    347 
    348 /* AIO_WritePool_getFile:
    349  * Returns the file the writePool is currently set to write to. */
    350 FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
    351     return AIO_IOPool_getFile(&ctx->base);
    352 }
    353 
    354 /* AIO_WritePool_releaseIoJob:
    355  * Releases an acquired job back to the pool. Doesn't execute the job. */
    356 void AIO_WritePool_releaseIoJob(IOJob_t* job) {
    357     AIO_IOPool_releaseIoJob(job);
    358 }
    359 
    360 /* AIO_WritePool_closeFile:
    361  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
    362  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
    363 int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
    364     FILE* const dstFile = ctx->base.file;
    365     assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
    366     AIO_WritePool_sparseWriteEnd(ctx);
    367     AIO_IOPool_setFile(&ctx->base, NULL);
    368     return fclose(dstFile);
    369 }
    370 
    371 /* AIO_WritePool_executeWriteJob:
    372  * Executes a write job synchronously. Can be used as a function for a thread pool. */
    373 static void AIO_WritePool_executeWriteJob(void* opaque){
    374     IOJob_t* const job = (IOJob_t*) opaque;
    375     WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
    376     ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
    377     AIO_IOPool_releaseIoJob(job);
    378 }
    379 
    380 /* AIO_WritePool_create:
    381  * Allocates and sets and a new write pool including its included jobs. */
    382 WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
    383     WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
    384     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
    385     AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
    386     ctx->storedSkips = 0;
    387     return ctx;
    388 }
    389 
    390 /* AIO_WritePool_free:
    391  * Frees and releases a writePool and its resources. Closes destination file if needs to. */
    392 void AIO_WritePool_free(WritePoolCtx_t* ctx) {
    393     /* Make sure we finish all tasks and then free the resources */
    394     if(AIO_WritePool_getFile(ctx))
    395         AIO_WritePool_closeFile(ctx);
    396     AIO_IOPool_destroy(&ctx->base);
    397     assert(ctx->storedSkips==0);
    398     free(ctx);
    399 }
    400 
    401 /* AIO_WritePool_setAsync:
    402  * Allows (de)activating async mode, to be used when the expected overhead
    403  * of asyncio costs more than the expected gains. */
    404 void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
    405     AIO_IOPool_setThreaded(&ctx->base, async);
    406 }
    407 
    408 
    409 /* ***********************************
    410  *  ReadPool implementation
    411  *************************************/
    412 static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
    413     int i;
    414     for(i=0; i<ctx->completedJobsCount; i++) {
    415         IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
    416         AIO_IOPool_releaseIoJob(job);
    417     }
    418     ctx->completedJobsCount = 0;
    419 }
    420 
    421 static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
    422     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
    423     AIO_IOPool_lockJobsMutex(&ctx->base);
    424     assert(ctx->completedJobsCount < MAX_IO_JOBS);
    425     ctx->completedJobs[ctx->completedJobsCount++] = job;
    426     if(AIO_IOPool_threadPoolActive(&ctx->base)) {
    427         ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
    428     }
    429     AIO_IOPool_unlockJobsMutex(&ctx->base);
    430 }
    431 
    432 /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
    433  * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
    434  * if job wasn't found returns NULL.
    435  * IMPORTANT: assumes ioJobsMutex is locked. */
    436 static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
    437     IOJob_t *job = NULL;
    438     int i;
    439     /* This implementation goes through all completed jobs and looks for the one matching the next offset.
    440      * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
    441      * reads to be completed in order) this implementation was chosen as it better fits other asyncio
    442      * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
    443     for (i=0; i<ctx->completedJobsCount; i++) {
    444         job = (IOJob_t *) ctx->completedJobs[i];
    445         if (job->offset == ctx->waitingOnOffset) {
    446             ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
    447             return job;
    448         }
    449     }
    450     return NULL;
    451 }
    452 
    453 /* AIO_ReadPool_numReadsInFlight:
    454  * Returns the number of IO read jobs currently in flight. */
    455 static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
    456     const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
    457     return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld));
    458 }
    459 
    460 /* AIO_ReadPool_getNextCompletedJob:
    461  * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
    462  * Would block. */
    463 static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
    464     IOJob_t *job = NULL;
    465     AIO_IOPool_lockJobsMutex(&ctx->base);
    466 
    467     job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
    468 
    469     /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
    470     while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
    471         assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
    472         ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
    473         job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
    474     }
    475 
    476     if(job) {
    477         assert(job->offset == ctx->waitingOnOffset);
    478         ctx->waitingOnOffset += job->usedBufferSize;
    479     }
    480 
    481     AIO_IOPool_unlockJobsMutex(&ctx->base);
    482     return job;
    483 }
    484 
    485 
    486 /* AIO_ReadPool_executeReadJob:
    487  * Executes a read job synchronously. Can be used as a function for a thread pool. */
    488 static void AIO_ReadPool_executeReadJob(void* opaque){
    489     IOJob_t* const job = (IOJob_t*) opaque;
    490     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
    491     if(ctx->reachedEof) {
    492         job->usedBufferSize = 0;
    493         AIO_ReadPool_addJobToCompleted(job);
    494         return;
    495     }
    496     job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
    497     if(job->usedBufferSize < job->bufferSize) {
    498         if(ferror(job->file)) {
    499             EXM_THROW(37, "Read error");
    500         } else if(feof(job->file)) {
    501             ctx->reachedEof = 1;
    502         } else {
    503             EXM_THROW(37, "Unexpected short read");
    504         }
    505     }
    506     AIO_ReadPool_addJobToCompleted(job);
    507 }
    508 
    509 static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
    510     IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
    511     job->offset = ctx->nextReadOffset;
    512     ctx->nextReadOffset += job->bufferSize;
    513     AIO_IOPool_enqueueJob(job);
    514 }
    515 
    516 static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
    517     while(ctx->base.availableJobsCount) {
    518         AIO_ReadPool_enqueueRead(ctx);
    519     }
    520 }
    521 
    522 /* AIO_ReadPool_setFile:
    523  * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
    524  * Waits for all current enqueued tasks to complete if a previous file was set. */
    525 void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
    526     assert(ctx!=NULL);
    527     AIO_IOPool_join(&ctx->base);
    528     AIO_ReadPool_releaseAllCompletedJobs(ctx);
    529     if (ctx->currentJobHeld) {
    530         AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
    531         ctx->currentJobHeld = NULL;
    532     }
    533     AIO_IOPool_setFile(&ctx->base, file);
    534     ctx->nextReadOffset = 0;
    535     ctx->waitingOnOffset = 0;
    536     ctx->srcBuffer = ctx->coalesceBuffer;
    537     ctx->srcBufferLoaded = 0;
    538     ctx->reachedEof = 0;
    539     if(file != NULL)
    540         AIO_ReadPool_startReading(ctx);
    541 }
    542 
    543 /* AIO_ReadPool_create:
    544  * Allocates and sets and a new readPool including its included jobs.
    545  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
    546  * as our basic read size. */
    547 ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
    548     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
    549     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
    550     AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
    551 
    552     ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
    553     if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory");
    554     ctx->srcBuffer = ctx->coalesceBuffer;
    555     ctx->srcBufferLoaded = 0;
    556     ctx->completedJobsCount = 0;
    557     ctx->currentJobHeld = NULL;
    558 
    559     if(ctx->base.threadPool)
    560         if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
    561             EXM_THROW(103,"Failed creating jobCompletedCond cond");
    562 
    563     return ctx;
    564 }
    565 
    566 /* AIO_ReadPool_free:
    567  * Frees and releases a readPool and its resources. Closes source file. */
    568 void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
    569     if(AIO_ReadPool_getFile(ctx))
    570         AIO_ReadPool_closeFile(ctx);
    571     if(ctx->base.threadPool)
    572         ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
    573     AIO_IOPool_destroy(&ctx->base);
    574     free(ctx->coalesceBuffer);
    575     free(ctx);
    576 }
    577 
    578 /* AIO_ReadPool_consumeBytes:
    579  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
    580 void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
    581     assert(n <= ctx->srcBufferLoaded);
    582     ctx->srcBufferLoaded -= n;
    583     ctx->srcBuffer += n;
    584 }
    585 
    586 /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
    587  * Release the current held job and get the next one, returns NULL if no next job available. */
    588 static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
    589     if (ctx->currentJobHeld) {
    590         AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
    591         ctx->currentJobHeld = NULL;
    592         AIO_ReadPool_enqueueRead(ctx);
    593     }
    594     ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
    595     return (IOJob_t*) ctx->currentJobHeld;
    596 }
    597 
    598 /* AIO_ReadPool_fillBuffer:
    599  * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
    600  * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
    601  * Return value is the number of bytes added to the buffer.
    602  * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
    603 size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
    604     IOJob_t *job;
    605     int useCoalesce = 0;
    606     if(n > ctx->base.jobBufferSize)
    607         n = ctx->base.jobBufferSize;
    608 
    609     /* We are good, don't read anything */
    610     if (ctx->srcBufferLoaded >= n)
    611         return 0;
    612 
    613     /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
    614      * and coalesce the remaining bytes with the next job's buffer */
    615     if (ctx->srcBufferLoaded > 0) {
    616         useCoalesce = 1;
    617         memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
    618         ctx->srcBuffer = ctx->coalesceBuffer;
    619     }
    620 
    621     /* Read the next chunk */
    622     job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
    623     if(!job)
    624         return 0;
    625     if(useCoalesce) {
    626         assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
    627         memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
    628         ctx->srcBufferLoaded += job->usedBufferSize;
    629     }
    630     else {
    631         ctx->srcBuffer = (U8 *) job->buffer;
    632         ctx->srcBufferLoaded = job->usedBufferSize;
    633     }
    634     return job->usedBufferSize;
    635 }
    636 
    637 /* AIO_ReadPool_consumeAndRefill:
    638  * Consumes the current buffer and refills it with bufferSize bytes. */
    639 size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
    640     AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
    641     return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
    642 }
    643 
    644 /* AIO_ReadPool_getFile:
    645  * Returns the current file set for the read pool. */
    646 FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
    647     return AIO_IOPool_getFile(&ctx->base);
    648 }
    649 
    650 /* AIO_ReadPool_closeFile:
    651  * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
    652 int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
    653     FILE* const file = AIO_ReadPool_getFile(ctx);
    654     AIO_ReadPool_setFile(ctx, NULL);
    655     return fclose(file);
    656 }
    657 
    658 /* AIO_ReadPool_setAsync:
    659  * Allows (de)activating async mode, to be used when the expected overhead
    660  * of asyncio costs more than the expected gains. */
    661 void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
    662     AIO_IOPool_setThreaded(&ctx->base, async);
    663 }
    664