Home | History | Annotate | Line # | Download | only in pzstd
      1  1.1  christos /*
      2  1.1  christos  * Copyright (c) Meta Platforms, Inc. and affiliates.
      3  1.1  christos  * All rights reserved.
      4  1.1  christos  *
      5  1.1  christos  * This source code is licensed under both the BSD-style license (found in the
      6  1.1  christos  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
      7  1.1  christos  * in the COPYING file in the root directory of this source tree).
      8  1.1  christos  */
      9  1.1  christos #include "platform.h"   /* Large Files support, SET_BINARY_MODE */
     10  1.1  christos #include "Pzstd.h"
     11  1.1  christos #include "SkippableFrame.h"
     12  1.1  christos #include "utils/FileSystem.h"
     13  1.1  christos #include "utils/Portability.h"
     14  1.1  christos #include "utils/Range.h"
     15  1.1  christos #include "utils/ScopeGuard.h"
     16  1.1  christos #include "utils/ThreadPool.h"
     17  1.1  christos #include "utils/WorkQueue.h"
     18  1.1  christos 
     19  1.1  christos #include <algorithm>
     20  1.1  christos #include <chrono>
     21  1.1  christos #include <cinttypes>
     22  1.1  christos #include <cstddef>
     23  1.1  christos #include <cstdio>
     24  1.1  christos #include <memory>
     25  1.1  christos #include <string>
     26  1.1  christos 
     27  1.1  christos 
     28  1.1  christos namespace pzstd {
     29  1.1  christos 
     30  1.1  christos namespace {
     31  1.1  christos #ifdef _WIN32
     32  1.1  christos const std::string nullOutput = "nul";
     33  1.1  christos #else
     34  1.1  christos const std::string nullOutput = "/dev/null";
     35  1.1  christos #endif
     36  1.1  christos }
     37  1.1  christos 
     38  1.1  christos using std::size_t;
     39  1.1  christos 
     40  1.1  christos static std::uintmax_t fileSizeOrZero(const std::string &file) {
     41  1.1  christos   if (file == "-") {
     42  1.1  christos     return 0;
     43  1.1  christos   }
     44  1.1  christos   std::error_code ec;
     45  1.1  christos   auto size = file_size(file, ec);
     46  1.1  christos   if (ec) {
     47  1.1  christos     size = 0;
     48  1.1  christos   }
     49  1.1  christos   return size;
     50  1.1  christos }
     51  1.1  christos 
     52  1.1  christos static std::uint64_t handleOneInput(const Options &options,
     53  1.1  christos                              const std::string &inputFile,
     54  1.1  christos                              FILE* inputFd,
     55  1.1  christos                              const std::string &outputFile,
     56  1.1  christos                              FILE* outputFd,
     57  1.1  christos                              SharedState& state) {
     58  1.1  christos   auto inputSize = fileSizeOrZero(inputFile);
     59  1.1  christos   // WorkQueue outlives ThreadPool so in the case of error we are certain
     60  1.1  christos   // we don't accidentally try to call push() on it after it is destroyed
     61  1.1  christos   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
     62  1.1  christos   std::uint64_t bytesRead;
     63  1.1  christos   std::uint64_t bytesWritten;
     64  1.1  christos   {
     65  1.1  christos     // Initialize the (de)compression thread pool with numThreads
     66  1.1  christos     ThreadPool executor(options.numThreads);
     67  1.1  christos     // Run the reader thread on an extra thread
     68  1.1  christos     ThreadPool readExecutor(1);
     69  1.1  christos     if (!options.decompress) {
     70  1.1  christos       // Add a job that reads the input and starts all the compression jobs
     71  1.1  christos       readExecutor.add(
     72  1.1  christos           [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
     73  1.1  christos             bytesRead = asyncCompressChunks(
     74  1.1  christos                 state,
     75  1.1  christos                 outs,
     76  1.1  christos                 executor,
     77  1.1  christos                 inputFd,
     78  1.1  christos                 inputSize,
     79  1.1  christos                 options.numThreads,
     80  1.1  christos                 options.determineParameters());
     81  1.1  christos           });
     82  1.1  christos       // Start writing
     83  1.1  christos       bytesWritten = writeFile(state, outs, outputFd, options.decompress);
     84  1.1  christos     } else {
     85  1.1  christos       // Add a job that reads the input and starts all the decompression jobs
     86  1.1  christos       readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
     87  1.1  christos         bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
     88  1.1  christos       });
     89  1.1  christos       // Start writing
     90  1.1  christos       bytesWritten = writeFile(state, outs, outputFd, options.decompress);
     91  1.1  christos     }
     92  1.1  christos   }
     93  1.1  christos   if (!state.errorHolder.hasError()) {
     94  1.1  christos     std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
     95  1.1  christos     std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
     96  1.1  christos     if (!options.decompress) {
     97  1.1  christos       double ratio = static_cast<double>(bytesWritten) /
     98  1.1  christos                      static_cast<double>(bytesRead + !bytesRead);
     99  1.1  christos       state.log(kLogInfo, "%-20s :%6.2f%%   (%6" PRIu64 " => %6" PRIu64
    100  1.1  christos                    " bytes, %s)\n",
    101  1.1  christos                    inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
    102  1.1  christos                    outputFileName.c_str());
    103  1.1  christos     } else {
    104  1.1  christos       state.log(kLogInfo, "%-20s: %" PRIu64 " bytes \n",
    105  1.1  christos                    inputFileName.c_str(),bytesWritten);
    106  1.1  christos     }
    107  1.1  christos   }
    108  1.1  christos   return bytesWritten;
    109  1.1  christos }
    110  1.1  christos 
    111  1.1  christos static FILE *openInputFile(const std::string &inputFile,
    112  1.1  christos                            ErrorHolder &errorHolder) {
    113  1.1  christos   if (inputFile == "-") {
    114  1.1  christos     SET_BINARY_MODE(stdin);
    115  1.1  christos     return stdin;
    116  1.1  christos   }
    117  1.1  christos   // Check if input file is a directory
    118  1.1  christos   {
    119  1.1  christos     std::error_code ec;
    120  1.1  christos     if (is_directory(inputFile, ec)) {
    121  1.1  christos       errorHolder.setError("Output file is a directory -- ignored");
    122  1.1  christos       return nullptr;
    123  1.1  christos     }
    124  1.1  christos   }
    125  1.1  christos   auto inputFd = std::fopen(inputFile.c_str(), "rb");
    126  1.1  christos   if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
    127  1.1  christos     return nullptr;
    128  1.1  christos   }
    129  1.1  christos   return inputFd;
    130  1.1  christos }
    131  1.1  christos 
    132  1.1  christos static FILE *openOutputFile(const Options &options,
    133  1.1  christos                             const std::string &outputFile,
    134  1.1  christos                             SharedState& state) {
    135  1.1  christos   if (outputFile == "-") {
    136  1.1  christos     SET_BINARY_MODE(stdout);
    137  1.1  christos     return stdout;
    138  1.1  christos   }
    139  1.1  christos   // Check if the output file exists and then open it
    140  1.1  christos   if (!options.overwrite && outputFile != nullOutput) {
    141  1.1  christos     auto outputFd = std::fopen(outputFile.c_str(), "rb");
    142  1.1  christos     if (outputFd != nullptr) {
    143  1.1  christos       std::fclose(outputFd);
    144  1.1  christos       if (!state.log.logsAt(kLogInfo)) {
    145  1.1  christos         state.errorHolder.setError("Output file exists");
    146  1.1  christos         return nullptr;
    147  1.1  christos       }
    148  1.1  christos       state.log(
    149  1.1  christos           kLogInfo,
    150  1.1  christos           "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
    151  1.1  christos           outputFile.c_str());
    152  1.1  christos       int c = getchar();
    153  1.1  christos       if (c != 'y' && c != 'Y') {
    154  1.1  christos         state.errorHolder.setError("Not overwritten");
    155  1.1  christos         return nullptr;
    156  1.1  christos       }
    157  1.1  christos     }
    158  1.1  christos   }
    159  1.1  christos   auto outputFd = std::fopen(outputFile.c_str(), "wb");
    160  1.1  christos   if (!state.errorHolder.check(
    161  1.1  christos           outputFd != nullptr, "Failed to open output file")) {
    162  1.1  christos     return nullptr;
    163  1.1  christos   }
    164  1.1  christos   return outputFd;
    165  1.1  christos }
    166  1.1  christos 
    167  1.1  christos int pzstdMain(const Options &options) {
    168  1.1  christos   int returnCode = 0;
    169  1.1  christos   SharedState state(options);
    170  1.1  christos   for (const auto& input : options.inputFiles) {
    171  1.1  christos     // Setup the shared state
    172  1.1  christos     auto printErrorGuard = makeScopeGuard([&] {
    173  1.1  christos       if (state.errorHolder.hasError()) {
    174  1.1  christos         returnCode = 1;
    175  1.1  christos         state.log(kLogError, "pzstd: %s: %s.\n", input.c_str(),
    176  1.1  christos                   state.errorHolder.getError().c_str());
    177  1.1  christos       }
    178  1.1  christos     });
    179  1.1  christos     // Open the input file
    180  1.1  christos     auto inputFd = openInputFile(input, state.errorHolder);
    181  1.1  christos     if (inputFd == nullptr) {
    182  1.1  christos       continue;
    183  1.1  christos     }
    184  1.1  christos     auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
    185  1.1  christos     // Open the output file
    186  1.1  christos     auto outputFile = options.getOutputFile(input);
    187  1.1  christos     if (!state.errorHolder.check(outputFile != "",
    188  1.1  christos                            "Input file does not have extension .zst")) {
    189  1.1  christos       continue;
    190  1.1  christos     }
    191  1.1  christos     auto outputFd = openOutputFile(options, outputFile, state);
    192  1.1  christos     if (outputFd == nullptr) {
    193  1.1  christos       continue;
    194  1.1  christos     }
    195  1.1  christos     auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
    196  1.1  christos     // (de)compress the file
    197  1.1  christos     handleOneInput(options, input, inputFd, outputFile, outputFd, state);
    198  1.1  christos     if (state.errorHolder.hasError()) {
    199  1.1  christos       continue;
    200  1.1  christos     }
    201  1.1  christos     // Delete the input file if necessary
    202  1.1  christos     if (!options.keepSource) {
    203  1.1  christos       // Be sure that we are done and have written everything before we delete
    204  1.1  christos       if (!state.errorHolder.check(std::fclose(inputFd) == 0,
    205  1.1  christos                              "Failed to close input file")) {
    206  1.1  christos         continue;
    207  1.1  christos       }
    208  1.1  christos       closeInputGuard.dismiss();
    209  1.1  christos       if (!state.errorHolder.check(std::fclose(outputFd) == 0,
    210  1.1  christos                              "Failed to close output file")) {
    211  1.1  christos         continue;
    212  1.1  christos       }
    213  1.1  christos       closeOutputGuard.dismiss();
    214  1.1  christos       if (std::remove(input.c_str()) != 0) {
    215  1.1  christos         state.errorHolder.setError("Failed to remove input file");
    216  1.1  christos         continue;
    217  1.1  christos       }
    218  1.1  christos     }
    219  1.1  christos   }
    220  1.1  christos   // Returns 1 if any of the files failed to (de)compress.
    221  1.1  christos   return returnCode;
    222  1.1  christos }
    223  1.1  christos 
    224  1.1  christos /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
    225  1.1  christos static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
    226  1.1  christos   return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
    227  1.1  christos }
    228  1.1  christos 
    229  1.1  christos /**
    230  1.1  christos  * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
    231  1.1  christos  * `inBuffer.pos`.
    232  1.1  christos  */
    233  1.1  christos void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
    234  1.1  christos   auto pos = inBuffer.pos;
    235  1.1  christos   inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
    236  1.1  christos   inBuffer.size -= pos;
    237  1.1  christos   inBuffer.pos = 0;
    238  1.1  christos   return buffer.advance(pos);
    239  1.1  christos }
    240  1.1  christos 
    241  1.1  christos /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
    242  1.1  christos static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
    243  1.1  christos   return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
    244  1.1  christos }
    245  1.1  christos 
    246  1.1  christos /**
    247  1.1  christos  * Split `buffer` and advance `outBuffer` by the amount of data written, as
    248  1.1  christos  * indicated by `outBuffer.pos`.
    249  1.1  christos  */
    250  1.1  christos Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
    251  1.1  christos   auto pos = outBuffer.pos;
    252  1.1  christos   outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
    253  1.1  christos   outBuffer.size -= pos;
    254  1.1  christos   outBuffer.pos = 0;
    255  1.1  christos   return buffer.splitAt(pos);
    256  1.1  christos }
    257  1.1  christos 
    258  1.1  christos /**
    259  1.1  christos  * Stream chunks of input from `in`, compress it, and stream it out to `out`.
    260  1.1  christos  *
    261  1.1  christos  * @param state        The shared state
    262  1.1  christos  * @param in           Queue that we `pop()` input buffers from
    263  1.1  christos  * @param out          Queue that we `push()` compressed output buffers to
    264  1.1  christos  * @param maxInputSize An upper bound on the size of the input
    265  1.1  christos  */
    266  1.1  christos static void compress(
    267  1.1  christos     SharedState& state,
    268  1.1  christos     std::shared_ptr<BufferWorkQueue> in,
    269  1.1  christos     std::shared_ptr<BufferWorkQueue> out,
    270  1.1  christos     size_t maxInputSize) {
    271  1.1  christos   auto& errorHolder = state.errorHolder;
    272  1.1  christos   auto guard = makeScopeGuard([&] { out->finish(); });
    273  1.1  christos   // Initialize the CCtx
    274  1.1  christos   auto ctx = state.cStreamPool->get();
    275  1.1  christos   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
    276  1.1  christos     return;
    277  1.1  christos   }
    278  1.1  christos   {
    279  1.1  christos     auto err = ZSTD_CCtx_reset(ctx.get(), ZSTD_reset_session_only);
    280  1.1  christos     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
    281  1.1  christos       return;
    282  1.1  christos     }
    283  1.1  christos   }
    284  1.1  christos 
    285  1.1  christos   // Allocate space for the result
    286  1.1  christos   auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
    287  1.1  christos   auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
    288  1.1  christos   {
    289  1.1  christos     Buffer inBuffer;
    290  1.1  christos     // Read a buffer in from the input queue
    291  1.1  christos     while (in->pop(inBuffer) && !errorHolder.hasError()) {
    292  1.1  christos       auto zstdInBuffer = makeZstdInBuffer(inBuffer);
    293  1.1  christos       // Compress the whole buffer and send it to the output queue
    294  1.1  christos       while (!inBuffer.empty() && !errorHolder.hasError()) {
    295  1.1  christos         if (!errorHolder.check(
    296  1.1  christos                 !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
    297  1.1  christos           return;
    298  1.1  christos         }
    299  1.1  christos         // Compress
    300  1.1  christos         auto err =
    301  1.1  christos             ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
    302  1.1  christos         if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
    303  1.1  christos           return;
    304  1.1  christos         }
    305  1.1  christos         // Split the compressed data off outBuffer and pass to the output queue
    306  1.1  christos         out->push(split(outBuffer, zstdOutBuffer));
    307  1.1  christos         // Forget about the data we already compressed
    308  1.1  christos         advance(inBuffer, zstdInBuffer);
    309  1.1  christos       }
    310  1.1  christos     }
    311  1.1  christos   }
    312  1.1  christos   // Write the epilog
    313  1.1  christos   size_t bytesLeft;
    314  1.1  christos   do {
    315  1.1  christos     if (!errorHolder.check(
    316  1.1  christos             !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
    317  1.1  christos       return;
    318  1.1  christos     }
    319  1.1  christos     bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
    320  1.1  christos     if (!errorHolder.check(
    321  1.1  christos             !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
    322  1.1  christos       return;
    323  1.1  christos     }
    324  1.1  christos     out->push(split(outBuffer, zstdOutBuffer));
    325  1.1  christos   } while (bytesLeft != 0 && !errorHolder.hasError());
    326  1.1  christos }
    327  1.1  christos 
    328  1.1  christos /**
    329  1.1  christos  * Calculates how large each independently compressed frame should be.
    330  1.1  christos  *
    331  1.1  christos  * @param size       The size of the source if known, 0 otherwise
    332  1.1  christos  * @param numThreads The number of threads available to run compression jobs on
    333  1.1  christos  * @param params     The zstd parameters to be used for compression
    334  1.1  christos  */
    335  1.1  christos static size_t calculateStep(
    336  1.1  christos     std::uintmax_t size,
    337  1.1  christos     size_t numThreads,
    338  1.1  christos     const ZSTD_parameters &params) {
    339  1.1  christos   (void)size;
    340  1.1  christos   (void)numThreads;
    341  1.1  christos   // Not validated to work correctly for window logs > 23.
    342  1.1  christos   // It will definitely fail if windowLog + 2 is >= 4GB because
    343  1.1  christos   // the skippable frame can only store sizes up to 4GB.
    344  1.1  christos   assert(params.cParams.windowLog <= 23);
    345  1.1  christos   return size_t{1} << (params.cParams.windowLog + 2);
    346  1.1  christos }
    347  1.1  christos 
    348  1.1  christos namespace {
    349  1.1  christos enum class FileStatus { Continue, Done, Error };
    350  1.1  christos /// Determines the status of the file descriptor `fd`.
    351  1.1  christos FileStatus fileStatus(FILE* fd) {
    352  1.1  christos   if (std::feof(fd)) {
    353  1.1  christos     return FileStatus::Done;
    354  1.1  christos   } else if (std::ferror(fd)) {
    355  1.1  christos     return FileStatus::Error;
    356  1.1  christos   }
    357  1.1  christos   return FileStatus::Continue;
    358  1.1  christos }
    359  1.1  christos } // anonymous namespace
    360  1.1  christos 
    361  1.1  christos /**
    362  1.1  christos  * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
    363  1.1  christos  * Will read less if an error or EOF occurs.
    364  1.1  christos  * Returns the status of the file after all of the reads have occurred.
    365  1.1  christos  */
    366  1.1  christos static FileStatus
    367  1.1  christos readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
    368  1.1  christos          std::uint64_t *totalBytesRead) {
    369  1.1  christos   Buffer buffer(size);
    370  1.1  christos   while (!buffer.empty()) {
    371  1.1  christos     auto bytesRead =
    372  1.1  christos         std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
    373  1.1  christos     *totalBytesRead += bytesRead;
    374  1.1  christos     queue.push(buffer.splitAt(bytesRead));
    375  1.1  christos     auto status = fileStatus(fd);
    376  1.1  christos     if (status != FileStatus::Continue) {
    377  1.1  christos       return status;
    378  1.1  christos     }
    379  1.1  christos   }
    380  1.1  christos   return FileStatus::Continue;
    381  1.1  christos }
    382  1.1  christos 
    383  1.1  christos std::uint64_t asyncCompressChunks(
    384  1.1  christos     SharedState& state,
    385  1.1  christos     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
    386  1.1  christos     ThreadPool& executor,
    387  1.1  christos     FILE* fd,
    388  1.1  christos     std::uintmax_t size,
    389  1.1  christos     size_t numThreads,
    390  1.1  christos     ZSTD_parameters params) {
    391  1.1  christos   auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
    392  1.1  christos   std::uint64_t bytesRead = 0;
    393  1.1  christos 
    394  1.1  christos   // Break the input up into chunks of size `step` and compress each chunk
    395  1.1  christos   // independently.
    396  1.1  christos   size_t step = calculateStep(size, numThreads, params);
    397  1.1  christos   state.log(kLogDebug, "Chosen frame size: %zu\n", step);
    398  1.1  christos   auto status = FileStatus::Continue;
    399  1.1  christos   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
    400  1.1  christos     // Make a new input queue that we will put the chunk's input data into.
    401  1.1  christos     auto in = std::make_shared<BufferWorkQueue>();
    402  1.1  christos     auto inGuard = makeScopeGuard([&] { in->finish(); });
    403  1.1  christos     // Make a new output queue that compress will put the compressed data into.
    404  1.1  christos     auto out = std::make_shared<BufferWorkQueue>();
    405  1.1  christos     // Start compression in the thread pool
    406  1.1  christos     executor.add([&state, in, out, step] {
    407  1.1  christos       return compress(
    408  1.1  christos           state, std::move(in), std::move(out), step);
    409  1.1  christos     });
    410  1.1  christos     // Pass the output queue to the writer thread.
    411  1.1  christos     chunks.push(std::move(out));
    412  1.1  christos     state.log(kLogVerbose, "%s\n", "Starting a new frame");
    413  1.1  christos     // Fill the input queue for the compression job we just started
    414  1.1  christos     status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
    415  1.1  christos   }
    416  1.1  christos   state.errorHolder.check(status != FileStatus::Error, "Error reading input");
    417  1.1  christos   return bytesRead;
    418  1.1  christos }
    419  1.1  christos 
    420  1.1  christos /**
    421  1.1  christos  * Decompress a frame, whose data is streamed into `in`, and stream the output
    422  1.1  christos  * to `out`.
    423  1.1  christos  *
    424  1.1  christos  * @param state        The shared state
    425  1.1  christos  * @param in           Queue that we `pop()` input buffers from. It contains
    426  1.1  christos  *                      exactly one compressed frame.
    427  1.1  christos  * @param out          Queue that we `push()` decompressed output buffers to
    428  1.1  christos  */
    429  1.1  christos static void decompress(
    430  1.1  christos     SharedState& state,
    431  1.1  christos     std::shared_ptr<BufferWorkQueue> in,
    432  1.1  christos     std::shared_ptr<BufferWorkQueue> out) {
    433  1.1  christos   auto& errorHolder = state.errorHolder;
    434  1.1  christos   auto guard = makeScopeGuard([&] { out->finish(); });
    435  1.1  christos   // Initialize the DCtx
    436  1.1  christos   auto ctx = state.dStreamPool->get();
    437  1.1  christos   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
    438  1.1  christos     return;
    439  1.1  christos   }
    440  1.1  christos   {
    441  1.1  christos     auto err = ZSTD_DCtx_reset(ctx.get(), ZSTD_reset_session_only);
    442  1.1  christos     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
    443  1.1  christos       return;
    444  1.1  christos     }
    445  1.1  christos   }
    446  1.1  christos 
    447  1.1  christos   const size_t outSize = ZSTD_DStreamOutSize();
    448  1.1  christos   Buffer inBuffer;
    449  1.1  christos   size_t returnCode = 0;
    450  1.1  christos   // Read a buffer in from the input queue
    451  1.1  christos   while (in->pop(inBuffer) && !errorHolder.hasError()) {
    452  1.1  christos     auto zstdInBuffer = makeZstdInBuffer(inBuffer);
    453  1.1  christos     // Decompress the whole buffer and send it to the output queue
    454  1.1  christos     while (!inBuffer.empty() && !errorHolder.hasError()) {
    455  1.1  christos       // Allocate a buffer with at least outSize bytes.
    456  1.1  christos       Buffer outBuffer(outSize);
    457  1.1  christos       auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
    458  1.1  christos       // Decompress
    459  1.1  christos       returnCode =
    460  1.1  christos           ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
    461  1.1  christos       if (!errorHolder.check(
    462  1.1  christos               !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
    463  1.1  christos         return;
    464  1.1  christos       }
    465  1.1  christos       // Pass the buffer with the decompressed data to the output queue
    466  1.1  christos       out->push(split(outBuffer, zstdOutBuffer));
    467  1.1  christos       // Advance past the input we already read
    468  1.1  christos       advance(inBuffer, zstdInBuffer);
    469  1.1  christos       if (returnCode == 0) {
    470  1.1  christos         // The frame is over, prepare to (maybe) start a new frame
    471  1.1  christos         ZSTD_initDStream(ctx.get());
    472  1.1  christos       }
    473  1.1  christos     }
    474  1.1  christos   }
    475  1.1  christos   if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
    476  1.1  christos     return;
    477  1.1  christos   }
    478  1.1  christos   // We've given ZSTD_decompressStream all of our data, but there may still
    479  1.1  christos   // be data to read.
    480  1.1  christos   while (returnCode == 1) {
    481  1.1  christos     // Allocate a buffer with at least outSize bytes.
    482  1.1  christos     Buffer outBuffer(outSize);
    483  1.1  christos     auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
    484  1.1  christos     // Pass in no input.
    485  1.1  christos     ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
    486  1.1  christos     // Decompress
    487  1.1  christos     returnCode =
    488  1.1  christos         ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
    489  1.1  christos     if (!errorHolder.check(
    490  1.1  christos             !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
    491  1.1  christos       return;
    492  1.1  christos     }
    493  1.1  christos     // Pass the buffer with the decompressed data to the output queue
    494  1.1  christos     out->push(split(outBuffer, zstdOutBuffer));
    495  1.1  christos   }
    496  1.1  christos }
    497  1.1  christos 
    498  1.1  christos std::uint64_t asyncDecompressFrames(
    499  1.1  christos     SharedState& state,
    500  1.1  christos     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
    501  1.1  christos     ThreadPool& executor,
    502  1.1  christos     FILE* fd) {
    503  1.1  christos   auto framesGuard = makeScopeGuard([&] { frames.finish(); });
    504  1.1  christos   std::uint64_t totalBytesRead = 0;
    505  1.1  christos 
    506  1.1  christos   // Split the source up into its component frames.
    507  1.1  christos   // If we find our recognized skippable frame we know the next frames size
    508  1.1  christos   // which means that we can decompress each standard frame in independently.
    509  1.1  christos   // Otherwise, we will decompress using only one decompression task.
    510  1.1  christos   const size_t chunkSize = ZSTD_DStreamInSize();
    511  1.1  christos   auto status = FileStatus::Continue;
    512  1.1  christos   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
    513  1.1  christos     // Make a new input queue that we will put the frames's bytes into.
    514  1.1  christos     auto in = std::make_shared<BufferWorkQueue>();
    515  1.1  christos     auto inGuard = makeScopeGuard([&] { in->finish(); });
    516  1.1  christos     // Make a output queue that decompress will put the decompressed data into
    517  1.1  christos     auto out = std::make_shared<BufferWorkQueue>();
    518  1.1  christos 
    519  1.1  christos     size_t frameSize;
    520  1.1  christos     {
    521  1.1  christos       // Calculate the size of the next frame.
    522  1.1  christos       // frameSize is 0 if the frame info can't be decoded.
    523  1.1  christos       Buffer buffer(SkippableFrame::kSize);
    524  1.1  christos       auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
    525  1.1  christos       totalBytesRead += bytesRead;
    526  1.1  christos       status = fileStatus(fd);
    527  1.1  christos       if (bytesRead == 0 && status != FileStatus::Continue) {
    528  1.1  christos         break;
    529  1.1  christos       }
    530  1.1  christos       buffer.subtract(buffer.size() - bytesRead);
    531  1.1  christos       frameSize = SkippableFrame::tryRead(buffer.range());
    532  1.1  christos       in->push(std::move(buffer));
    533  1.1  christos     }
    534  1.1  christos     if (frameSize == 0) {
    535  1.1  christos       // We hit a non SkippableFrame, so this will be the last job.
    536  1.1  christos       // Make sure that we don't use too much memory
    537  1.1  christos       in->setMaxSize(64);
    538  1.1  christos       out->setMaxSize(64);
    539  1.1  christos     }
    540  1.1  christos     // Start decompression in the thread pool
    541  1.1  christos     executor.add([&state, in, out] {
    542  1.1  christos       return decompress(state, std::move(in), std::move(out));
    543  1.1  christos     });
    544  1.1  christos     // Pass the output queue to the writer thread
    545  1.1  christos     frames.push(std::move(out));
    546  1.1  christos     if (frameSize == 0) {
    547  1.1  christos       // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
    548  1.1  christos       // Pass the rest of the source to this decompression task
    549  1.1  christos       state.log(kLogVerbose, "%s\n",
    550  1.1  christos           "Input not in pzstd format, falling back to serial decompression");
    551  1.1  christos       while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
    552  1.1  christos         status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
    553  1.1  christos       }
    554  1.1  christos       break;
    555  1.1  christos     }
    556  1.1  christos     state.log(kLogVerbose, "Decompressing a frame of size %zu", frameSize);
    557  1.1  christos     // Fill the input queue for the decompression job we just started
    558  1.1  christos     status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
    559  1.1  christos   }
    560  1.1  christos   state.errorHolder.check(status != FileStatus::Error, "Error reading input");
    561  1.1  christos   return totalBytesRead;
    562  1.1  christos }
    563  1.1  christos 
    564  1.1  christos /// Write `data` to `fd`, returns true iff success.
    565  1.1  christos static bool writeData(ByteRange data, FILE* fd) {
    566  1.1  christos   while (!data.empty()) {
    567  1.1  christos     data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
    568  1.1  christos     if (std::ferror(fd)) {
    569  1.1  christos       return false;
    570  1.1  christos     }
    571  1.1  christos   }
    572  1.1  christos   return true;
    573  1.1  christos }
    574  1.1  christos 
    575  1.1  christos std::uint64_t writeFile(
    576  1.1  christos     SharedState& state,
    577  1.1  christos     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
    578  1.1  christos     FILE* outputFd,
    579  1.1  christos     bool decompress) {
    580  1.1  christos   auto& errorHolder = state.errorHolder;
    581  1.1  christos   auto lineClearGuard = makeScopeGuard([&state] {
    582  1.1  christos     state.log.clear(kLogInfo);
    583  1.1  christos   });
    584  1.1  christos   std::uint64_t bytesWritten = 0;
    585  1.1  christos   std::shared_ptr<BufferWorkQueue> out;
    586  1.1  christos   // Grab the output queue for each decompression job (in order).
    587  1.1  christos   while (outs.pop(out)) {
    588  1.1  christos     if (errorHolder.hasError()) {
    589  1.1  christos       continue;
    590  1.1  christos     }
    591  1.1  christos     if (!decompress) {
    592  1.1  christos       // If we are compressing and want to write skippable frames we can't
    593  1.1  christos       // start writing before compression is done because we need to know the
    594  1.1  christos       // compressed size.
    595  1.1  christos       // Wait for the compressed size to be available and write skippable frame
    596  1.1  christos       assert(uint64_t(out->size()) < uint64_t(1) << 32);
    597  1.1  christos       SkippableFrame frame(uint32_t(out->size()));
    598  1.1  christos       if (!writeData(frame.data(), outputFd)) {
    599  1.1  christos         errorHolder.setError("Failed to write output");
    600  1.1  christos         return bytesWritten;
    601  1.1  christos       }
    602  1.1  christos       bytesWritten += frame.kSize;
    603  1.1  christos     }
    604  1.1  christos     // For each chunk of the frame: Pop it from the queue and write it
    605  1.1  christos     Buffer buffer;
    606  1.1  christos     while (out->pop(buffer) && !errorHolder.hasError()) {
    607  1.1  christos       if (!writeData(buffer.range(), outputFd)) {
    608  1.1  christos         errorHolder.setError("Failed to write output");
    609  1.1  christos         return bytesWritten;
    610  1.1  christos       }
    611  1.1  christos       bytesWritten += buffer.size();
    612  1.1  christos       state.log.update(kLogInfo, "Written: %u MB   ",
    613  1.1  christos                 static_cast<std::uint32_t>(bytesWritten >> 20));
    614  1.1  christos     }
    615  1.1  christos   }
    616  1.1  christos   return bytesWritten;
    617  1.1  christos }
    618  1.1  christos }
    619