1 1.1 christos /* 2 1.1 christos * Copyright (c) Meta Platforms, Inc. and affiliates. 3 1.1 christos * All rights reserved. 4 1.1 christos * 5 1.1 christos * This source code is licensed under both the BSD-style license (found in the 6 1.1 christos * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7 1.1 christos * in the COPYING file in the root directory of this source tree). 8 1.1 christos */ 9 1.1 christos #include "platform.h" /* Large Files support, SET_BINARY_MODE */ 10 1.1 christos #include "Pzstd.h" 11 1.1 christos #include "SkippableFrame.h" 12 1.1 christos #include "utils/FileSystem.h" 13 1.1 christos #include "utils/Portability.h" 14 1.1 christos #include "utils/Range.h" 15 1.1 christos #include "utils/ScopeGuard.h" 16 1.1 christos #include "utils/ThreadPool.h" 17 1.1 christos #include "utils/WorkQueue.h" 18 1.1 christos 19 1.1 christos #include <algorithm> 20 1.1 christos #include <chrono> 21 1.1 christos #include <cinttypes> 22 1.1 christos #include <cstddef> 23 1.1 christos #include <cstdio> 24 1.1 christos #include <memory> 25 1.1 christos #include <string> 26 1.1 christos 27 1.1 christos 28 1.1 christos namespace pzstd { 29 1.1 christos 30 1.1 christos namespace { 31 1.1 christos #ifdef _WIN32 32 1.1 christos const std::string nullOutput = "nul"; 33 1.1 christos #else 34 1.1 christos const std::string nullOutput = "/dev/null"; 35 1.1 christos #endif 36 1.1 christos } 37 1.1 christos 38 1.1 christos using std::size_t; 39 1.1 christos 40 1.1 christos static std::uintmax_t fileSizeOrZero(const std::string &file) { 41 1.1 christos if (file == "-") { 42 1.1 christos return 0; 43 1.1 christos } 44 1.1 christos std::error_code ec; 45 1.1 christos auto size = file_size(file, ec); 46 1.1 christos if (ec) { 47 1.1 christos size = 0; 48 1.1 christos } 49 1.1 christos return size; 50 1.1 christos } 51 1.1 christos 52 1.1 christos static std::uint64_t handleOneInput(const Options &options, 53 1.1 christos const std::string &inputFile, 54 1.1 christos FILE* inputFd, 55 1.1 christos const std::string &outputFile, 56 1.1 christos FILE* outputFd, 57 1.1 christos SharedState& state) { 58 1.1 christos auto inputSize = fileSizeOrZero(inputFile); 59 1.1 christos // WorkQueue outlives ThreadPool so in the case of error we are certain 60 1.1 christos // we don't accidentally try to call push() on it after it is destroyed 61 1.1 christos WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1}; 62 1.1 christos std::uint64_t bytesRead; 63 1.1 christos std::uint64_t bytesWritten; 64 1.1 christos { 65 1.1 christos // Initialize the (de)compression thread pool with numThreads 66 1.1 christos ThreadPool executor(options.numThreads); 67 1.1 christos // Run the reader thread on an extra thread 68 1.1 christos ThreadPool readExecutor(1); 69 1.1 christos if (!options.decompress) { 70 1.1 christos // Add a job that reads the input and starts all the compression jobs 71 1.1 christos readExecutor.add( 72 1.1 christos [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] { 73 1.1 christos bytesRead = asyncCompressChunks( 74 1.1 christos state, 75 1.1 christos outs, 76 1.1 christos executor, 77 1.1 christos inputFd, 78 1.1 christos inputSize, 79 1.1 christos options.numThreads, 80 1.1 christos options.determineParameters()); 81 1.1 christos }); 82 1.1 christos // Start writing 83 1.1 christos bytesWritten = writeFile(state, outs, outputFd, options.decompress); 84 1.1 christos } else { 85 1.1 christos // Add a job that reads the input and starts all the decompression jobs 86 1.1 christos readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] { 87 1.1 christos bytesRead = asyncDecompressFrames(state, outs, executor, inputFd); 88 1.1 christos }); 89 1.1 christos // Start writing 90 1.1 christos bytesWritten = writeFile(state, outs, outputFd, options.decompress); 91 1.1 christos } 92 1.1 christos } 93 1.1 christos if (!state.errorHolder.hasError()) { 94 1.1 christos std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; 95 1.1 christos std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; 96 1.1 christos if (!options.decompress) { 97 1.1 christos double ratio = static_cast<double>(bytesWritten) / 98 1.1 christos static_cast<double>(bytesRead + !bytesRead); 99 1.1 christos state.log(kLogInfo, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64 100 1.1 christos " bytes, %s)\n", 101 1.1 christos inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten, 102 1.1 christos outputFileName.c_str()); 103 1.1 christos } else { 104 1.1 christos state.log(kLogInfo, "%-20s: %" PRIu64 " bytes \n", 105 1.1 christos inputFileName.c_str(),bytesWritten); 106 1.1 christos } 107 1.1 christos } 108 1.1 christos return bytesWritten; 109 1.1 christos } 110 1.1 christos 111 1.1 christos static FILE *openInputFile(const std::string &inputFile, 112 1.1 christos ErrorHolder &errorHolder) { 113 1.1 christos if (inputFile == "-") { 114 1.1 christos SET_BINARY_MODE(stdin); 115 1.1 christos return stdin; 116 1.1 christos } 117 1.1 christos // Check if input file is a directory 118 1.1 christos { 119 1.1 christos std::error_code ec; 120 1.1 christos if (is_directory(inputFile, ec)) { 121 1.1 christos errorHolder.setError("Output file is a directory -- ignored"); 122 1.1 christos return nullptr; 123 1.1 christos } 124 1.1 christos } 125 1.1 christos auto inputFd = std::fopen(inputFile.c_str(), "rb"); 126 1.1 christos if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) { 127 1.1 christos return nullptr; 128 1.1 christos } 129 1.1 christos return inputFd; 130 1.1 christos } 131 1.1 christos 132 1.1 christos static FILE *openOutputFile(const Options &options, 133 1.1 christos const std::string &outputFile, 134 1.1 christos SharedState& state) { 135 1.1 christos if (outputFile == "-") { 136 1.1 christos SET_BINARY_MODE(stdout); 137 1.1 christos return stdout; 138 1.1 christos } 139 1.1 christos // Check if the output file exists and then open it 140 1.1 christos if (!options.overwrite && outputFile != nullOutput) { 141 1.1 christos auto outputFd = std::fopen(outputFile.c_str(), "rb"); 142 1.1 christos if (outputFd != nullptr) { 143 1.1 christos std::fclose(outputFd); 144 1.1 christos if (!state.log.logsAt(kLogInfo)) { 145 1.1 christos state.errorHolder.setError("Output file exists"); 146 1.1 christos return nullptr; 147 1.1 christos } 148 1.1 christos state.log( 149 1.1 christos kLogInfo, 150 1.1 christos "pzstd: %s already exists; do you wish to overwrite (y/n) ? ", 151 1.1 christos outputFile.c_str()); 152 1.1 christos int c = getchar(); 153 1.1 christos if (c != 'y' && c != 'Y') { 154 1.1 christos state.errorHolder.setError("Not overwritten"); 155 1.1 christos return nullptr; 156 1.1 christos } 157 1.1 christos } 158 1.1 christos } 159 1.1 christos auto outputFd = std::fopen(outputFile.c_str(), "wb"); 160 1.1 christos if (!state.errorHolder.check( 161 1.1 christos outputFd != nullptr, "Failed to open output file")) { 162 1.1 christos return nullptr; 163 1.1 christos } 164 1.1 christos return outputFd; 165 1.1 christos } 166 1.1 christos 167 1.1 christos int pzstdMain(const Options &options) { 168 1.1 christos int returnCode = 0; 169 1.1 christos SharedState state(options); 170 1.1 christos for (const auto& input : options.inputFiles) { 171 1.1 christos // Setup the shared state 172 1.1 christos auto printErrorGuard = makeScopeGuard([&] { 173 1.1 christos if (state.errorHolder.hasError()) { 174 1.1 christos returnCode = 1; 175 1.1 christos state.log(kLogError, "pzstd: %s: %s.\n", input.c_str(), 176 1.1 christos state.errorHolder.getError().c_str()); 177 1.1 christos } 178 1.1 christos }); 179 1.1 christos // Open the input file 180 1.1 christos auto inputFd = openInputFile(input, state.errorHolder); 181 1.1 christos if (inputFd == nullptr) { 182 1.1 christos continue; 183 1.1 christos } 184 1.1 christos auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); }); 185 1.1 christos // Open the output file 186 1.1 christos auto outputFile = options.getOutputFile(input); 187 1.1 christos if (!state.errorHolder.check(outputFile != "", 188 1.1 christos "Input file does not have extension .zst")) { 189 1.1 christos continue; 190 1.1 christos } 191 1.1 christos auto outputFd = openOutputFile(options, outputFile, state); 192 1.1 christos if (outputFd == nullptr) { 193 1.1 christos continue; 194 1.1 christos } 195 1.1 christos auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); 196 1.1 christos // (de)compress the file 197 1.1 christos handleOneInput(options, input, inputFd, outputFile, outputFd, state); 198 1.1 christos if (state.errorHolder.hasError()) { 199 1.1 christos continue; 200 1.1 christos } 201 1.1 christos // Delete the input file if necessary 202 1.1 christos if (!options.keepSource) { 203 1.1 christos // Be sure that we are done and have written everything before we delete 204 1.1 christos if (!state.errorHolder.check(std::fclose(inputFd) == 0, 205 1.1 christos "Failed to close input file")) { 206 1.1 christos continue; 207 1.1 christos } 208 1.1 christos closeInputGuard.dismiss(); 209 1.1 christos if (!state.errorHolder.check(std::fclose(outputFd) == 0, 210 1.1 christos "Failed to close output file")) { 211 1.1 christos continue; 212 1.1 christos } 213 1.1 christos closeOutputGuard.dismiss(); 214 1.1 christos if (std::remove(input.c_str()) != 0) { 215 1.1 christos state.errorHolder.setError("Failed to remove input file"); 216 1.1 christos continue; 217 1.1 christos } 218 1.1 christos } 219 1.1 christos } 220 1.1 christos // Returns 1 if any of the files failed to (de)compress. 221 1.1 christos return returnCode; 222 1.1 christos } 223 1.1 christos 224 1.1 christos /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`. 225 1.1 christos static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) { 226 1.1 christos return ZSTD_inBuffer{buffer.data(), buffer.size(), 0}; 227 1.1 christos } 228 1.1 christos 229 1.1 christos /** 230 1.1 christos * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by 231 1.1 christos * `inBuffer.pos`. 232 1.1 christos */ 233 1.1 christos void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) { 234 1.1 christos auto pos = inBuffer.pos; 235 1.1 christos inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos; 236 1.1 christos inBuffer.size -= pos; 237 1.1 christos inBuffer.pos = 0; 238 1.1 christos return buffer.advance(pos); 239 1.1 christos } 240 1.1 christos 241 1.1 christos /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`. 242 1.1 christos static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) { 243 1.1 christos return ZSTD_outBuffer{buffer.data(), buffer.size(), 0}; 244 1.1 christos } 245 1.1 christos 246 1.1 christos /** 247 1.1 christos * Split `buffer` and advance `outBuffer` by the amount of data written, as 248 1.1 christos * indicated by `outBuffer.pos`. 249 1.1 christos */ 250 1.1 christos Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) { 251 1.1 christos auto pos = outBuffer.pos; 252 1.1 christos outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos; 253 1.1 christos outBuffer.size -= pos; 254 1.1 christos outBuffer.pos = 0; 255 1.1 christos return buffer.splitAt(pos); 256 1.1 christos } 257 1.1 christos 258 1.1 christos /** 259 1.1 christos * Stream chunks of input from `in`, compress it, and stream it out to `out`. 260 1.1 christos * 261 1.1 christos * @param state The shared state 262 1.1 christos * @param in Queue that we `pop()` input buffers from 263 1.1 christos * @param out Queue that we `push()` compressed output buffers to 264 1.1 christos * @param maxInputSize An upper bound on the size of the input 265 1.1 christos */ 266 1.1 christos static void compress( 267 1.1 christos SharedState& state, 268 1.1 christos std::shared_ptr<BufferWorkQueue> in, 269 1.1 christos std::shared_ptr<BufferWorkQueue> out, 270 1.1 christos size_t maxInputSize) { 271 1.1 christos auto& errorHolder = state.errorHolder; 272 1.1 christos auto guard = makeScopeGuard([&] { out->finish(); }); 273 1.1 christos // Initialize the CCtx 274 1.1 christos auto ctx = state.cStreamPool->get(); 275 1.1 christos if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) { 276 1.1 christos return; 277 1.1 christos } 278 1.1 christos { 279 1.1 christos auto err = ZSTD_CCtx_reset(ctx.get(), ZSTD_reset_session_only); 280 1.1 christos if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { 281 1.1 christos return; 282 1.1 christos } 283 1.1 christos } 284 1.1 christos 285 1.1 christos // Allocate space for the result 286 1.1 christos auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize)); 287 1.1 christos auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); 288 1.1 christos { 289 1.1 christos Buffer inBuffer; 290 1.1 christos // Read a buffer in from the input queue 291 1.1 christos while (in->pop(inBuffer) && !errorHolder.hasError()) { 292 1.1 christos auto zstdInBuffer = makeZstdInBuffer(inBuffer); 293 1.1 christos // Compress the whole buffer and send it to the output queue 294 1.1 christos while (!inBuffer.empty() && !errorHolder.hasError()) { 295 1.1 christos if (!errorHolder.check( 296 1.1 christos !outBuffer.empty(), "ZSTD_compressBound() was too small")) { 297 1.1 christos return; 298 1.1 christos } 299 1.1 christos // Compress 300 1.1 christos auto err = 301 1.1 christos ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); 302 1.1 christos if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { 303 1.1 christos return; 304 1.1 christos } 305 1.1 christos // Split the compressed data off outBuffer and pass to the output queue 306 1.1 christos out->push(split(outBuffer, zstdOutBuffer)); 307 1.1 christos // Forget about the data we already compressed 308 1.1 christos advance(inBuffer, zstdInBuffer); 309 1.1 christos } 310 1.1 christos } 311 1.1 christos } 312 1.1 christos // Write the epilog 313 1.1 christos size_t bytesLeft; 314 1.1 christos do { 315 1.1 christos if (!errorHolder.check( 316 1.1 christos !outBuffer.empty(), "ZSTD_compressBound() was too small")) { 317 1.1 christos return; 318 1.1 christos } 319 1.1 christos bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer); 320 1.1 christos if (!errorHolder.check( 321 1.1 christos !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) { 322 1.1 christos return; 323 1.1 christos } 324 1.1 christos out->push(split(outBuffer, zstdOutBuffer)); 325 1.1 christos } while (bytesLeft != 0 && !errorHolder.hasError()); 326 1.1 christos } 327 1.1 christos 328 1.1 christos /** 329 1.1 christos * Calculates how large each independently compressed frame should be. 330 1.1 christos * 331 1.1 christos * @param size The size of the source if known, 0 otherwise 332 1.1 christos * @param numThreads The number of threads available to run compression jobs on 333 1.1 christos * @param params The zstd parameters to be used for compression 334 1.1 christos */ 335 1.1 christos static size_t calculateStep( 336 1.1 christos std::uintmax_t size, 337 1.1 christos size_t numThreads, 338 1.1 christos const ZSTD_parameters ¶ms) { 339 1.1 christos (void)size; 340 1.1 christos (void)numThreads; 341 1.1 christos // Not validated to work correctly for window logs > 23. 342 1.1 christos // It will definitely fail if windowLog + 2 is >= 4GB because 343 1.1 christos // the skippable frame can only store sizes up to 4GB. 344 1.1 christos assert(params.cParams.windowLog <= 23); 345 1.1 christos return size_t{1} << (params.cParams.windowLog + 2); 346 1.1 christos } 347 1.1 christos 348 1.1 christos namespace { 349 1.1 christos enum class FileStatus { Continue, Done, Error }; 350 1.1 christos /// Determines the status of the file descriptor `fd`. 351 1.1 christos FileStatus fileStatus(FILE* fd) { 352 1.1 christos if (std::feof(fd)) { 353 1.1 christos return FileStatus::Done; 354 1.1 christos } else if (std::ferror(fd)) { 355 1.1 christos return FileStatus::Error; 356 1.1 christos } 357 1.1 christos return FileStatus::Continue; 358 1.1 christos } 359 1.1 christos } // anonymous namespace 360 1.1 christos 361 1.1 christos /** 362 1.1 christos * Reads `size` data in chunks of `chunkSize` and puts it into `queue`. 363 1.1 christos * Will read less if an error or EOF occurs. 364 1.1 christos * Returns the status of the file after all of the reads have occurred. 365 1.1 christos */ 366 1.1 christos static FileStatus 367 1.1 christos readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd, 368 1.1 christos std::uint64_t *totalBytesRead) { 369 1.1 christos Buffer buffer(size); 370 1.1 christos while (!buffer.empty()) { 371 1.1 christos auto bytesRead = 372 1.1 christos std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); 373 1.1 christos *totalBytesRead += bytesRead; 374 1.1 christos queue.push(buffer.splitAt(bytesRead)); 375 1.1 christos auto status = fileStatus(fd); 376 1.1 christos if (status != FileStatus::Continue) { 377 1.1 christos return status; 378 1.1 christos } 379 1.1 christos } 380 1.1 christos return FileStatus::Continue; 381 1.1 christos } 382 1.1 christos 383 1.1 christos std::uint64_t asyncCompressChunks( 384 1.1 christos SharedState& state, 385 1.1 christos WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, 386 1.1 christos ThreadPool& executor, 387 1.1 christos FILE* fd, 388 1.1 christos std::uintmax_t size, 389 1.1 christos size_t numThreads, 390 1.1 christos ZSTD_parameters params) { 391 1.1 christos auto chunksGuard = makeScopeGuard([&] { chunks.finish(); }); 392 1.1 christos std::uint64_t bytesRead = 0; 393 1.1 christos 394 1.1 christos // Break the input up into chunks of size `step` and compress each chunk 395 1.1 christos // independently. 396 1.1 christos size_t step = calculateStep(size, numThreads, params); 397 1.1 christos state.log(kLogDebug, "Chosen frame size: %zu\n", step); 398 1.1 christos auto status = FileStatus::Continue; 399 1.1 christos while (status == FileStatus::Continue && !state.errorHolder.hasError()) { 400 1.1 christos // Make a new input queue that we will put the chunk's input data into. 401 1.1 christos auto in = std::make_shared<BufferWorkQueue>(); 402 1.1 christos auto inGuard = makeScopeGuard([&] { in->finish(); }); 403 1.1 christos // Make a new output queue that compress will put the compressed data into. 404 1.1 christos auto out = std::make_shared<BufferWorkQueue>(); 405 1.1 christos // Start compression in the thread pool 406 1.1 christos executor.add([&state, in, out, step] { 407 1.1 christos return compress( 408 1.1 christos state, std::move(in), std::move(out), step); 409 1.1 christos }); 410 1.1 christos // Pass the output queue to the writer thread. 411 1.1 christos chunks.push(std::move(out)); 412 1.1 christos state.log(kLogVerbose, "%s\n", "Starting a new frame"); 413 1.1 christos // Fill the input queue for the compression job we just started 414 1.1 christos status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); 415 1.1 christos } 416 1.1 christos state.errorHolder.check(status != FileStatus::Error, "Error reading input"); 417 1.1 christos return bytesRead; 418 1.1 christos } 419 1.1 christos 420 1.1 christos /** 421 1.1 christos * Decompress a frame, whose data is streamed into `in`, and stream the output 422 1.1 christos * to `out`. 423 1.1 christos * 424 1.1 christos * @param state The shared state 425 1.1 christos * @param in Queue that we `pop()` input buffers from. It contains 426 1.1 christos * exactly one compressed frame. 427 1.1 christos * @param out Queue that we `push()` decompressed output buffers to 428 1.1 christos */ 429 1.1 christos static void decompress( 430 1.1 christos SharedState& state, 431 1.1 christos std::shared_ptr<BufferWorkQueue> in, 432 1.1 christos std::shared_ptr<BufferWorkQueue> out) { 433 1.1 christos auto& errorHolder = state.errorHolder; 434 1.1 christos auto guard = makeScopeGuard([&] { out->finish(); }); 435 1.1 christos // Initialize the DCtx 436 1.1 christos auto ctx = state.dStreamPool->get(); 437 1.1 christos if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) { 438 1.1 christos return; 439 1.1 christos } 440 1.1 christos { 441 1.1 christos auto err = ZSTD_DCtx_reset(ctx.get(), ZSTD_reset_session_only); 442 1.1 christos if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { 443 1.1 christos return; 444 1.1 christos } 445 1.1 christos } 446 1.1 christos 447 1.1 christos const size_t outSize = ZSTD_DStreamOutSize(); 448 1.1 christos Buffer inBuffer; 449 1.1 christos size_t returnCode = 0; 450 1.1 christos // Read a buffer in from the input queue 451 1.1 christos while (in->pop(inBuffer) && !errorHolder.hasError()) { 452 1.1 christos auto zstdInBuffer = makeZstdInBuffer(inBuffer); 453 1.1 christos // Decompress the whole buffer and send it to the output queue 454 1.1 christos while (!inBuffer.empty() && !errorHolder.hasError()) { 455 1.1 christos // Allocate a buffer with at least outSize bytes. 456 1.1 christos Buffer outBuffer(outSize); 457 1.1 christos auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); 458 1.1 christos // Decompress 459 1.1 christos returnCode = 460 1.1 christos ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); 461 1.1 christos if (!errorHolder.check( 462 1.1 christos !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) { 463 1.1 christos return; 464 1.1 christos } 465 1.1 christos // Pass the buffer with the decompressed data to the output queue 466 1.1 christos out->push(split(outBuffer, zstdOutBuffer)); 467 1.1 christos // Advance past the input we already read 468 1.1 christos advance(inBuffer, zstdInBuffer); 469 1.1 christos if (returnCode == 0) { 470 1.1 christos // The frame is over, prepare to (maybe) start a new frame 471 1.1 christos ZSTD_initDStream(ctx.get()); 472 1.1 christos } 473 1.1 christos } 474 1.1 christos } 475 1.1 christos if (!errorHolder.check(returnCode <= 1, "Incomplete block")) { 476 1.1 christos return; 477 1.1 christos } 478 1.1 christos // We've given ZSTD_decompressStream all of our data, but there may still 479 1.1 christos // be data to read. 480 1.1 christos while (returnCode == 1) { 481 1.1 christos // Allocate a buffer with at least outSize bytes. 482 1.1 christos Buffer outBuffer(outSize); 483 1.1 christos auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); 484 1.1 christos // Pass in no input. 485 1.1 christos ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0}; 486 1.1 christos // Decompress 487 1.1 christos returnCode = 488 1.1 christos ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); 489 1.1 christos if (!errorHolder.check( 490 1.1 christos !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) { 491 1.1 christos return; 492 1.1 christos } 493 1.1 christos // Pass the buffer with the decompressed data to the output queue 494 1.1 christos out->push(split(outBuffer, zstdOutBuffer)); 495 1.1 christos } 496 1.1 christos } 497 1.1 christos 498 1.1 christos std::uint64_t asyncDecompressFrames( 499 1.1 christos SharedState& state, 500 1.1 christos WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, 501 1.1 christos ThreadPool& executor, 502 1.1 christos FILE* fd) { 503 1.1 christos auto framesGuard = makeScopeGuard([&] { frames.finish(); }); 504 1.1 christos std::uint64_t totalBytesRead = 0; 505 1.1 christos 506 1.1 christos // Split the source up into its component frames. 507 1.1 christos // If we find our recognized skippable frame we know the next frames size 508 1.1 christos // which means that we can decompress each standard frame in independently. 509 1.1 christos // Otherwise, we will decompress using only one decompression task. 510 1.1 christos const size_t chunkSize = ZSTD_DStreamInSize(); 511 1.1 christos auto status = FileStatus::Continue; 512 1.1 christos while (status == FileStatus::Continue && !state.errorHolder.hasError()) { 513 1.1 christos // Make a new input queue that we will put the frames's bytes into. 514 1.1 christos auto in = std::make_shared<BufferWorkQueue>(); 515 1.1 christos auto inGuard = makeScopeGuard([&] { in->finish(); }); 516 1.1 christos // Make a output queue that decompress will put the decompressed data into 517 1.1 christos auto out = std::make_shared<BufferWorkQueue>(); 518 1.1 christos 519 1.1 christos size_t frameSize; 520 1.1 christos { 521 1.1 christos // Calculate the size of the next frame. 522 1.1 christos // frameSize is 0 if the frame info can't be decoded. 523 1.1 christos Buffer buffer(SkippableFrame::kSize); 524 1.1 christos auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); 525 1.1 christos totalBytesRead += bytesRead; 526 1.1 christos status = fileStatus(fd); 527 1.1 christos if (bytesRead == 0 && status != FileStatus::Continue) { 528 1.1 christos break; 529 1.1 christos } 530 1.1 christos buffer.subtract(buffer.size() - bytesRead); 531 1.1 christos frameSize = SkippableFrame::tryRead(buffer.range()); 532 1.1 christos in->push(std::move(buffer)); 533 1.1 christos } 534 1.1 christos if (frameSize == 0) { 535 1.1 christos // We hit a non SkippableFrame, so this will be the last job. 536 1.1 christos // Make sure that we don't use too much memory 537 1.1 christos in->setMaxSize(64); 538 1.1 christos out->setMaxSize(64); 539 1.1 christos } 540 1.1 christos // Start decompression in the thread pool 541 1.1 christos executor.add([&state, in, out] { 542 1.1 christos return decompress(state, std::move(in), std::move(out)); 543 1.1 christos }); 544 1.1 christos // Pass the output queue to the writer thread 545 1.1 christos frames.push(std::move(out)); 546 1.1 christos if (frameSize == 0) { 547 1.1 christos // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted 548 1.1 christos // Pass the rest of the source to this decompression task 549 1.1 christos state.log(kLogVerbose, "%s\n", 550 1.1 christos "Input not in pzstd format, falling back to serial decompression"); 551 1.1 christos while (status == FileStatus::Continue && !state.errorHolder.hasError()) { 552 1.1 christos status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); 553 1.1 christos } 554 1.1 christos break; 555 1.1 christos } 556 1.1 christos state.log(kLogVerbose, "Decompressing a frame of size %zu", frameSize); 557 1.1 christos // Fill the input queue for the decompression job we just started 558 1.1 christos status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); 559 1.1 christos } 560 1.1 christos state.errorHolder.check(status != FileStatus::Error, "Error reading input"); 561 1.1 christos return totalBytesRead; 562 1.1 christos } 563 1.1 christos 564 1.1 christos /// Write `data` to `fd`, returns true iff success. 565 1.1 christos static bool writeData(ByteRange data, FILE* fd) { 566 1.1 christos while (!data.empty()) { 567 1.1 christos data.advance(std::fwrite(data.begin(), 1, data.size(), fd)); 568 1.1 christos if (std::ferror(fd)) { 569 1.1 christos return false; 570 1.1 christos } 571 1.1 christos } 572 1.1 christos return true; 573 1.1 christos } 574 1.1 christos 575 1.1 christos std::uint64_t writeFile( 576 1.1 christos SharedState& state, 577 1.1 christos WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, 578 1.1 christos FILE* outputFd, 579 1.1 christos bool decompress) { 580 1.1 christos auto& errorHolder = state.errorHolder; 581 1.1 christos auto lineClearGuard = makeScopeGuard([&state] { 582 1.1 christos state.log.clear(kLogInfo); 583 1.1 christos }); 584 1.1 christos std::uint64_t bytesWritten = 0; 585 1.1 christos std::shared_ptr<BufferWorkQueue> out; 586 1.1 christos // Grab the output queue for each decompression job (in order). 587 1.1 christos while (outs.pop(out)) { 588 1.1 christos if (errorHolder.hasError()) { 589 1.1 christos continue; 590 1.1 christos } 591 1.1 christos if (!decompress) { 592 1.1 christos // If we are compressing and want to write skippable frames we can't 593 1.1 christos // start writing before compression is done because we need to know the 594 1.1 christos // compressed size. 595 1.1 christos // Wait for the compressed size to be available and write skippable frame 596 1.1 christos assert(uint64_t(out->size()) < uint64_t(1) << 32); 597 1.1 christos SkippableFrame frame(uint32_t(out->size())); 598 1.1 christos if (!writeData(frame.data(), outputFd)) { 599 1.1 christos errorHolder.setError("Failed to write output"); 600 1.1 christos return bytesWritten; 601 1.1 christos } 602 1.1 christos bytesWritten += frame.kSize; 603 1.1 christos } 604 1.1 christos // For each chunk of the frame: Pop it from the queue and write it 605 1.1 christos Buffer buffer; 606 1.1 christos while (out->pop(buffer) && !errorHolder.hasError()) { 607 1.1 christos if (!writeData(buffer.range(), outputFd)) { 608 1.1 christos errorHolder.setError("Failed to write output"); 609 1.1 christos return bytesWritten; 610 1.1 christos } 611 1.1 christos bytesWritten += buffer.size(); 612 1.1 christos state.log.update(kLogInfo, "Written: %u MB ", 613 1.1 christos static_cast<std::uint32_t>(bytesWritten >> 20)); 614 1.1 christos } 615 1.1 christos } 616 1.1 christos return bytesWritten; 617 1.1 christos } 618 1.1 christos } 619