Pzstd.cpp revision 1.1 1 1.1 christos /*
2 1.1 christos * Copyright (c) Meta Platforms, Inc. and affiliates.
3 1.1 christos * All rights reserved.
4 1.1 christos *
5 1.1 christos * This source code is licensed under both the BSD-style license (found in the
6 1.1 christos * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 1.1 christos * in the COPYING file in the root directory of this source tree).
8 1.1 christos */
9 1.1 christos #include "platform.h" /* Large Files support, SET_BINARY_MODE */
10 1.1 christos #include "Pzstd.h"
11 1.1 christos #include "SkippableFrame.h"
12 1.1 christos #include "utils/FileSystem.h"
13 1.1 christos #include "utils/Portability.h"
14 1.1 christos #include "utils/Range.h"
15 1.1 christos #include "utils/ScopeGuard.h"
16 1.1 christos #include "utils/ThreadPool.h"
17 1.1 christos #include "utils/WorkQueue.h"
18 1.1 christos
19 1.1 christos #include <algorithm>
20 1.1 christos #include <chrono>
21 1.1 christos #include <cinttypes>
22 1.1 christos #include <cstddef>
23 1.1 christos #include <cstdio>
24 1.1 christos #include <memory>
25 1.1 christos #include <string>
26 1.1 christos
27 1.1 christos
28 1.1 christos namespace pzstd {
29 1.1 christos
30 1.1 christos namespace {
31 1.1 christos #ifdef _WIN32
32 1.1 christos const std::string nullOutput = "nul";
33 1.1 christos #else
34 1.1 christos const std::string nullOutput = "/dev/null";
35 1.1 christos #endif
36 1.1 christos }
37 1.1 christos
38 1.1 christos using std::size_t;
39 1.1 christos
40 1.1 christos static std::uintmax_t fileSizeOrZero(const std::string &file) {
41 1.1 christos if (file == "-") {
42 1.1 christos return 0;
43 1.1 christos }
44 1.1 christos std::error_code ec;
45 1.1 christos auto size = file_size(file, ec);
46 1.1 christos if (ec) {
47 1.1 christos size = 0;
48 1.1 christos }
49 1.1 christos return size;
50 1.1 christos }
51 1.1 christos
52 1.1 christos static std::uint64_t handleOneInput(const Options &options,
53 1.1 christos const std::string &inputFile,
54 1.1 christos FILE* inputFd,
55 1.1 christos const std::string &outputFile,
56 1.1 christos FILE* outputFd,
57 1.1 christos SharedState& state) {
58 1.1 christos auto inputSize = fileSizeOrZero(inputFile);
59 1.1 christos // WorkQueue outlives ThreadPool so in the case of error we are certain
60 1.1 christos // we don't accidentally try to call push() on it after it is destroyed
61 1.1 christos WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
62 1.1 christos std::uint64_t bytesRead;
63 1.1 christos std::uint64_t bytesWritten;
64 1.1 christos {
65 1.1 christos // Initialize the (de)compression thread pool with numThreads
66 1.1 christos ThreadPool executor(options.numThreads);
67 1.1 christos // Run the reader thread on an extra thread
68 1.1 christos ThreadPool readExecutor(1);
69 1.1 christos if (!options.decompress) {
70 1.1 christos // Add a job that reads the input and starts all the compression jobs
71 1.1 christos readExecutor.add(
72 1.1 christos [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
73 1.1 christos bytesRead = asyncCompressChunks(
74 1.1 christos state,
75 1.1 christos outs,
76 1.1 christos executor,
77 1.1 christos inputFd,
78 1.1 christos inputSize,
79 1.1 christos options.numThreads,
80 1.1 christos options.determineParameters());
81 1.1 christos });
82 1.1 christos // Start writing
83 1.1 christos bytesWritten = writeFile(state, outs, outputFd, options.decompress);
84 1.1 christos } else {
85 1.1 christos // Add a job that reads the input and starts all the decompression jobs
86 1.1 christos readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
87 1.1 christos bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
88 1.1 christos });
89 1.1 christos // Start writing
90 1.1 christos bytesWritten = writeFile(state, outs, outputFd, options.decompress);
91 1.1 christos }
92 1.1 christos }
93 1.1 christos if (!state.errorHolder.hasError()) {
94 1.1 christos std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
95 1.1 christos std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
96 1.1 christos if (!options.decompress) {
97 1.1 christos double ratio = static_cast<double>(bytesWritten) /
98 1.1 christos static_cast<double>(bytesRead + !bytesRead);
99 1.1 christos state.log(kLogInfo, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
100 1.1 christos " bytes, %s)\n",
101 1.1 christos inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
102 1.1 christos outputFileName.c_str());
103 1.1 christos } else {
104 1.1 christos state.log(kLogInfo, "%-20s: %" PRIu64 " bytes \n",
105 1.1 christos inputFileName.c_str(),bytesWritten);
106 1.1 christos }
107 1.1 christos }
108 1.1 christos return bytesWritten;
109 1.1 christos }
110 1.1 christos
111 1.1 christos static FILE *openInputFile(const std::string &inputFile,
112 1.1 christos ErrorHolder &errorHolder) {
113 1.1 christos if (inputFile == "-") {
114 1.1 christos SET_BINARY_MODE(stdin);
115 1.1 christos return stdin;
116 1.1 christos }
117 1.1 christos // Check if input file is a directory
118 1.1 christos {
119 1.1 christos std::error_code ec;
120 1.1 christos if (is_directory(inputFile, ec)) {
121 1.1 christos errorHolder.setError("Output file is a directory -- ignored");
122 1.1 christos return nullptr;
123 1.1 christos }
124 1.1 christos }
125 1.1 christos auto inputFd = std::fopen(inputFile.c_str(), "rb");
126 1.1 christos if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
127 1.1 christos return nullptr;
128 1.1 christos }
129 1.1 christos return inputFd;
130 1.1 christos }
131 1.1 christos
132 1.1 christos static FILE *openOutputFile(const Options &options,
133 1.1 christos const std::string &outputFile,
134 1.1 christos SharedState& state) {
135 1.1 christos if (outputFile == "-") {
136 1.1 christos SET_BINARY_MODE(stdout);
137 1.1 christos return stdout;
138 1.1 christos }
139 1.1 christos // Check if the output file exists and then open it
140 1.1 christos if (!options.overwrite && outputFile != nullOutput) {
141 1.1 christos auto outputFd = std::fopen(outputFile.c_str(), "rb");
142 1.1 christos if (outputFd != nullptr) {
143 1.1 christos std::fclose(outputFd);
144 1.1 christos if (!state.log.logsAt(kLogInfo)) {
145 1.1 christos state.errorHolder.setError("Output file exists");
146 1.1 christos return nullptr;
147 1.1 christos }
148 1.1 christos state.log(
149 1.1 christos kLogInfo,
150 1.1 christos "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
151 1.1 christos outputFile.c_str());
152 1.1 christos int c = getchar();
153 1.1 christos if (c != 'y' && c != 'Y') {
154 1.1 christos state.errorHolder.setError("Not overwritten");
155 1.1 christos return nullptr;
156 1.1 christos }
157 1.1 christos }
158 1.1 christos }
159 1.1 christos auto outputFd = std::fopen(outputFile.c_str(), "wb");
160 1.1 christos if (!state.errorHolder.check(
161 1.1 christos outputFd != nullptr, "Failed to open output file")) {
162 1.1 christos return nullptr;
163 1.1 christos }
164 1.1 christos return outputFd;
165 1.1 christos }
166 1.1 christos
167 1.1 christos int pzstdMain(const Options &options) {
168 1.1 christos int returnCode = 0;
169 1.1 christos SharedState state(options);
170 1.1 christos for (const auto& input : options.inputFiles) {
171 1.1 christos // Setup the shared state
172 1.1 christos auto printErrorGuard = makeScopeGuard([&] {
173 1.1 christos if (state.errorHolder.hasError()) {
174 1.1 christos returnCode = 1;
175 1.1 christos state.log(kLogError, "pzstd: %s: %s.\n", input.c_str(),
176 1.1 christos state.errorHolder.getError().c_str());
177 1.1 christos }
178 1.1 christos });
179 1.1 christos // Open the input file
180 1.1 christos auto inputFd = openInputFile(input, state.errorHolder);
181 1.1 christos if (inputFd == nullptr) {
182 1.1 christos continue;
183 1.1 christos }
184 1.1 christos auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
185 1.1 christos // Open the output file
186 1.1 christos auto outputFile = options.getOutputFile(input);
187 1.1 christos if (!state.errorHolder.check(outputFile != "",
188 1.1 christos "Input file does not have extension .zst")) {
189 1.1 christos continue;
190 1.1 christos }
191 1.1 christos auto outputFd = openOutputFile(options, outputFile, state);
192 1.1 christos if (outputFd == nullptr) {
193 1.1 christos continue;
194 1.1 christos }
195 1.1 christos auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
196 1.1 christos // (de)compress the file
197 1.1 christos handleOneInput(options, input, inputFd, outputFile, outputFd, state);
198 1.1 christos if (state.errorHolder.hasError()) {
199 1.1 christos continue;
200 1.1 christos }
201 1.1 christos // Delete the input file if necessary
202 1.1 christos if (!options.keepSource) {
203 1.1 christos // Be sure that we are done and have written everything before we delete
204 1.1 christos if (!state.errorHolder.check(std::fclose(inputFd) == 0,
205 1.1 christos "Failed to close input file")) {
206 1.1 christos continue;
207 1.1 christos }
208 1.1 christos closeInputGuard.dismiss();
209 1.1 christos if (!state.errorHolder.check(std::fclose(outputFd) == 0,
210 1.1 christos "Failed to close output file")) {
211 1.1 christos continue;
212 1.1 christos }
213 1.1 christos closeOutputGuard.dismiss();
214 1.1 christos if (std::remove(input.c_str()) != 0) {
215 1.1 christos state.errorHolder.setError("Failed to remove input file");
216 1.1 christos continue;
217 1.1 christos }
218 1.1 christos }
219 1.1 christos }
220 1.1 christos // Returns 1 if any of the files failed to (de)compress.
221 1.1 christos return returnCode;
222 1.1 christos }
223 1.1 christos
224 1.1 christos /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
225 1.1 christos static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
226 1.1 christos return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
227 1.1 christos }
228 1.1 christos
229 1.1 christos /**
230 1.1 christos * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
231 1.1 christos * `inBuffer.pos`.
232 1.1 christos */
233 1.1 christos void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
234 1.1 christos auto pos = inBuffer.pos;
235 1.1 christos inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
236 1.1 christos inBuffer.size -= pos;
237 1.1 christos inBuffer.pos = 0;
238 1.1 christos return buffer.advance(pos);
239 1.1 christos }
240 1.1 christos
241 1.1 christos /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
242 1.1 christos static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
243 1.1 christos return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
244 1.1 christos }
245 1.1 christos
246 1.1 christos /**
247 1.1 christos * Split `buffer` and advance `outBuffer` by the amount of data written, as
248 1.1 christos * indicated by `outBuffer.pos`.
249 1.1 christos */
250 1.1 christos Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
251 1.1 christos auto pos = outBuffer.pos;
252 1.1 christos outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
253 1.1 christos outBuffer.size -= pos;
254 1.1 christos outBuffer.pos = 0;
255 1.1 christos return buffer.splitAt(pos);
256 1.1 christos }
257 1.1 christos
258 1.1 christos /**
259 1.1 christos * Stream chunks of input from `in`, compress it, and stream it out to `out`.
260 1.1 christos *
261 1.1 christos * @param state The shared state
262 1.1 christos * @param in Queue that we `pop()` input buffers from
263 1.1 christos * @param out Queue that we `push()` compressed output buffers to
264 1.1 christos * @param maxInputSize An upper bound on the size of the input
265 1.1 christos */
266 1.1 christos static void compress(
267 1.1 christos SharedState& state,
268 1.1 christos std::shared_ptr<BufferWorkQueue> in,
269 1.1 christos std::shared_ptr<BufferWorkQueue> out,
270 1.1 christos size_t maxInputSize) {
271 1.1 christos auto& errorHolder = state.errorHolder;
272 1.1 christos auto guard = makeScopeGuard([&] { out->finish(); });
273 1.1 christos // Initialize the CCtx
274 1.1 christos auto ctx = state.cStreamPool->get();
275 1.1 christos if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
276 1.1 christos return;
277 1.1 christos }
278 1.1 christos {
279 1.1 christos auto err = ZSTD_CCtx_reset(ctx.get(), ZSTD_reset_session_only);
280 1.1 christos if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
281 1.1 christos return;
282 1.1 christos }
283 1.1 christos }
284 1.1 christos
285 1.1 christos // Allocate space for the result
286 1.1 christos auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
287 1.1 christos auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
288 1.1 christos {
289 1.1 christos Buffer inBuffer;
290 1.1 christos // Read a buffer in from the input queue
291 1.1 christos while (in->pop(inBuffer) && !errorHolder.hasError()) {
292 1.1 christos auto zstdInBuffer = makeZstdInBuffer(inBuffer);
293 1.1 christos // Compress the whole buffer and send it to the output queue
294 1.1 christos while (!inBuffer.empty() && !errorHolder.hasError()) {
295 1.1 christos if (!errorHolder.check(
296 1.1 christos !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
297 1.1 christos return;
298 1.1 christos }
299 1.1 christos // Compress
300 1.1 christos auto err =
301 1.1 christos ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
302 1.1 christos if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
303 1.1 christos return;
304 1.1 christos }
305 1.1 christos // Split the compressed data off outBuffer and pass to the output queue
306 1.1 christos out->push(split(outBuffer, zstdOutBuffer));
307 1.1 christos // Forget about the data we already compressed
308 1.1 christos advance(inBuffer, zstdInBuffer);
309 1.1 christos }
310 1.1 christos }
311 1.1 christos }
312 1.1 christos // Write the epilog
313 1.1 christos size_t bytesLeft;
314 1.1 christos do {
315 1.1 christos if (!errorHolder.check(
316 1.1 christos !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
317 1.1 christos return;
318 1.1 christos }
319 1.1 christos bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
320 1.1 christos if (!errorHolder.check(
321 1.1 christos !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
322 1.1 christos return;
323 1.1 christos }
324 1.1 christos out->push(split(outBuffer, zstdOutBuffer));
325 1.1 christos } while (bytesLeft != 0 && !errorHolder.hasError());
326 1.1 christos }
327 1.1 christos
328 1.1 christos /**
329 1.1 christos * Calculates how large each independently compressed frame should be.
330 1.1 christos *
331 1.1 christos * @param size The size of the source if known, 0 otherwise
332 1.1 christos * @param numThreads The number of threads available to run compression jobs on
333 1.1 christos * @param params The zstd parameters to be used for compression
334 1.1 christos */
335 1.1 christos static size_t calculateStep(
336 1.1 christos std::uintmax_t size,
337 1.1 christos size_t numThreads,
338 1.1 christos const ZSTD_parameters ¶ms) {
339 1.1 christos (void)size;
340 1.1 christos (void)numThreads;
341 1.1 christos // Not validated to work correctly for window logs > 23.
342 1.1 christos // It will definitely fail if windowLog + 2 is >= 4GB because
343 1.1 christos // the skippable frame can only store sizes up to 4GB.
344 1.1 christos assert(params.cParams.windowLog <= 23);
345 1.1 christos return size_t{1} << (params.cParams.windowLog + 2);
346 1.1 christos }
347 1.1 christos
348 1.1 christos namespace {
349 1.1 christos enum class FileStatus { Continue, Done, Error };
350 1.1 christos /// Determines the status of the file descriptor `fd`.
351 1.1 christos FileStatus fileStatus(FILE* fd) {
352 1.1 christos if (std::feof(fd)) {
353 1.1 christos return FileStatus::Done;
354 1.1 christos } else if (std::ferror(fd)) {
355 1.1 christos return FileStatus::Error;
356 1.1 christos }
357 1.1 christos return FileStatus::Continue;
358 1.1 christos }
359 1.1 christos } // anonymous namespace
360 1.1 christos
361 1.1 christos /**
362 1.1 christos * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
363 1.1 christos * Will read less if an error or EOF occurs.
364 1.1 christos * Returns the status of the file after all of the reads have occurred.
365 1.1 christos */
366 1.1 christos static FileStatus
367 1.1 christos readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
368 1.1 christos std::uint64_t *totalBytesRead) {
369 1.1 christos Buffer buffer(size);
370 1.1 christos while (!buffer.empty()) {
371 1.1 christos auto bytesRead =
372 1.1 christos std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
373 1.1 christos *totalBytesRead += bytesRead;
374 1.1 christos queue.push(buffer.splitAt(bytesRead));
375 1.1 christos auto status = fileStatus(fd);
376 1.1 christos if (status != FileStatus::Continue) {
377 1.1 christos return status;
378 1.1 christos }
379 1.1 christos }
380 1.1 christos return FileStatus::Continue;
381 1.1 christos }
382 1.1 christos
383 1.1 christos std::uint64_t asyncCompressChunks(
384 1.1 christos SharedState& state,
385 1.1 christos WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
386 1.1 christos ThreadPool& executor,
387 1.1 christos FILE* fd,
388 1.1 christos std::uintmax_t size,
389 1.1 christos size_t numThreads,
390 1.1 christos ZSTD_parameters params) {
391 1.1 christos auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
392 1.1 christos std::uint64_t bytesRead = 0;
393 1.1 christos
394 1.1 christos // Break the input up into chunks of size `step` and compress each chunk
395 1.1 christos // independently.
396 1.1 christos size_t step = calculateStep(size, numThreads, params);
397 1.1 christos state.log(kLogDebug, "Chosen frame size: %zu\n", step);
398 1.1 christos auto status = FileStatus::Continue;
399 1.1 christos while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
400 1.1 christos // Make a new input queue that we will put the chunk's input data into.
401 1.1 christos auto in = std::make_shared<BufferWorkQueue>();
402 1.1 christos auto inGuard = makeScopeGuard([&] { in->finish(); });
403 1.1 christos // Make a new output queue that compress will put the compressed data into.
404 1.1 christos auto out = std::make_shared<BufferWorkQueue>();
405 1.1 christos // Start compression in the thread pool
406 1.1 christos executor.add([&state, in, out, step] {
407 1.1 christos return compress(
408 1.1 christos state, std::move(in), std::move(out), step);
409 1.1 christos });
410 1.1 christos // Pass the output queue to the writer thread.
411 1.1 christos chunks.push(std::move(out));
412 1.1 christos state.log(kLogVerbose, "%s\n", "Starting a new frame");
413 1.1 christos // Fill the input queue for the compression job we just started
414 1.1 christos status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
415 1.1 christos }
416 1.1 christos state.errorHolder.check(status != FileStatus::Error, "Error reading input");
417 1.1 christos return bytesRead;
418 1.1 christos }
419 1.1 christos
420 1.1 christos /**
421 1.1 christos * Decompress a frame, whose data is streamed into `in`, and stream the output
422 1.1 christos * to `out`.
423 1.1 christos *
424 1.1 christos * @param state The shared state
425 1.1 christos * @param in Queue that we `pop()` input buffers from. It contains
426 1.1 christos * exactly one compressed frame.
427 1.1 christos * @param out Queue that we `push()` decompressed output buffers to
428 1.1 christos */
429 1.1 christos static void decompress(
430 1.1 christos SharedState& state,
431 1.1 christos std::shared_ptr<BufferWorkQueue> in,
432 1.1 christos std::shared_ptr<BufferWorkQueue> out) {
433 1.1 christos auto& errorHolder = state.errorHolder;
434 1.1 christos auto guard = makeScopeGuard([&] { out->finish(); });
435 1.1 christos // Initialize the DCtx
436 1.1 christos auto ctx = state.dStreamPool->get();
437 1.1 christos if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
438 1.1 christos return;
439 1.1 christos }
440 1.1 christos {
441 1.1 christos auto err = ZSTD_DCtx_reset(ctx.get(), ZSTD_reset_session_only);
442 1.1 christos if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
443 1.1 christos return;
444 1.1 christos }
445 1.1 christos }
446 1.1 christos
447 1.1 christos const size_t outSize = ZSTD_DStreamOutSize();
448 1.1 christos Buffer inBuffer;
449 1.1 christos size_t returnCode = 0;
450 1.1 christos // Read a buffer in from the input queue
451 1.1 christos while (in->pop(inBuffer) && !errorHolder.hasError()) {
452 1.1 christos auto zstdInBuffer = makeZstdInBuffer(inBuffer);
453 1.1 christos // Decompress the whole buffer and send it to the output queue
454 1.1 christos while (!inBuffer.empty() && !errorHolder.hasError()) {
455 1.1 christos // Allocate a buffer with at least outSize bytes.
456 1.1 christos Buffer outBuffer(outSize);
457 1.1 christos auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
458 1.1 christos // Decompress
459 1.1 christos returnCode =
460 1.1 christos ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
461 1.1 christos if (!errorHolder.check(
462 1.1 christos !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
463 1.1 christos return;
464 1.1 christos }
465 1.1 christos // Pass the buffer with the decompressed data to the output queue
466 1.1 christos out->push(split(outBuffer, zstdOutBuffer));
467 1.1 christos // Advance past the input we already read
468 1.1 christos advance(inBuffer, zstdInBuffer);
469 1.1 christos if (returnCode == 0) {
470 1.1 christos // The frame is over, prepare to (maybe) start a new frame
471 1.1 christos ZSTD_initDStream(ctx.get());
472 1.1 christos }
473 1.1 christos }
474 1.1 christos }
475 1.1 christos if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
476 1.1 christos return;
477 1.1 christos }
478 1.1 christos // We've given ZSTD_decompressStream all of our data, but there may still
479 1.1 christos // be data to read.
480 1.1 christos while (returnCode == 1) {
481 1.1 christos // Allocate a buffer with at least outSize bytes.
482 1.1 christos Buffer outBuffer(outSize);
483 1.1 christos auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
484 1.1 christos // Pass in no input.
485 1.1 christos ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
486 1.1 christos // Decompress
487 1.1 christos returnCode =
488 1.1 christos ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
489 1.1 christos if (!errorHolder.check(
490 1.1 christos !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
491 1.1 christos return;
492 1.1 christos }
493 1.1 christos // Pass the buffer with the decompressed data to the output queue
494 1.1 christos out->push(split(outBuffer, zstdOutBuffer));
495 1.1 christos }
496 1.1 christos }
497 1.1 christos
498 1.1 christos std::uint64_t asyncDecompressFrames(
499 1.1 christos SharedState& state,
500 1.1 christos WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
501 1.1 christos ThreadPool& executor,
502 1.1 christos FILE* fd) {
503 1.1 christos auto framesGuard = makeScopeGuard([&] { frames.finish(); });
504 1.1 christos std::uint64_t totalBytesRead = 0;
505 1.1 christos
506 1.1 christos // Split the source up into its component frames.
507 1.1 christos // If we find our recognized skippable frame we know the next frames size
508 1.1 christos // which means that we can decompress each standard frame in independently.
509 1.1 christos // Otherwise, we will decompress using only one decompression task.
510 1.1 christos const size_t chunkSize = ZSTD_DStreamInSize();
511 1.1 christos auto status = FileStatus::Continue;
512 1.1 christos while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
513 1.1 christos // Make a new input queue that we will put the frames's bytes into.
514 1.1 christos auto in = std::make_shared<BufferWorkQueue>();
515 1.1 christos auto inGuard = makeScopeGuard([&] { in->finish(); });
516 1.1 christos // Make a output queue that decompress will put the decompressed data into
517 1.1 christos auto out = std::make_shared<BufferWorkQueue>();
518 1.1 christos
519 1.1 christos size_t frameSize;
520 1.1 christos {
521 1.1 christos // Calculate the size of the next frame.
522 1.1 christos // frameSize is 0 if the frame info can't be decoded.
523 1.1 christos Buffer buffer(SkippableFrame::kSize);
524 1.1 christos auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
525 1.1 christos totalBytesRead += bytesRead;
526 1.1 christos status = fileStatus(fd);
527 1.1 christos if (bytesRead == 0 && status != FileStatus::Continue) {
528 1.1 christos break;
529 1.1 christos }
530 1.1 christos buffer.subtract(buffer.size() - bytesRead);
531 1.1 christos frameSize = SkippableFrame::tryRead(buffer.range());
532 1.1 christos in->push(std::move(buffer));
533 1.1 christos }
534 1.1 christos if (frameSize == 0) {
535 1.1 christos // We hit a non SkippableFrame, so this will be the last job.
536 1.1 christos // Make sure that we don't use too much memory
537 1.1 christos in->setMaxSize(64);
538 1.1 christos out->setMaxSize(64);
539 1.1 christos }
540 1.1 christos // Start decompression in the thread pool
541 1.1 christos executor.add([&state, in, out] {
542 1.1 christos return decompress(state, std::move(in), std::move(out));
543 1.1 christos });
544 1.1 christos // Pass the output queue to the writer thread
545 1.1 christos frames.push(std::move(out));
546 1.1 christos if (frameSize == 0) {
547 1.1 christos // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
548 1.1 christos // Pass the rest of the source to this decompression task
549 1.1 christos state.log(kLogVerbose, "%s\n",
550 1.1 christos "Input not in pzstd format, falling back to serial decompression");
551 1.1 christos while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
552 1.1 christos status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
553 1.1 christos }
554 1.1 christos break;
555 1.1 christos }
556 1.1 christos state.log(kLogVerbose, "Decompressing a frame of size %zu", frameSize);
557 1.1 christos // Fill the input queue for the decompression job we just started
558 1.1 christos status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
559 1.1 christos }
560 1.1 christos state.errorHolder.check(status != FileStatus::Error, "Error reading input");
561 1.1 christos return totalBytesRead;
562 1.1 christos }
563 1.1 christos
564 1.1 christos /// Write `data` to `fd`, returns true iff success.
565 1.1 christos static bool writeData(ByteRange data, FILE* fd) {
566 1.1 christos while (!data.empty()) {
567 1.1 christos data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
568 1.1 christos if (std::ferror(fd)) {
569 1.1 christos return false;
570 1.1 christos }
571 1.1 christos }
572 1.1 christos return true;
573 1.1 christos }
574 1.1 christos
575 1.1 christos std::uint64_t writeFile(
576 1.1 christos SharedState& state,
577 1.1 christos WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
578 1.1 christos FILE* outputFd,
579 1.1 christos bool decompress) {
580 1.1 christos auto& errorHolder = state.errorHolder;
581 1.1 christos auto lineClearGuard = makeScopeGuard([&state] {
582 1.1 christos state.log.clear(kLogInfo);
583 1.1 christos });
584 1.1 christos std::uint64_t bytesWritten = 0;
585 1.1 christos std::shared_ptr<BufferWorkQueue> out;
586 1.1 christos // Grab the output queue for each decompression job (in order).
587 1.1 christos while (outs.pop(out)) {
588 1.1 christos if (errorHolder.hasError()) {
589 1.1 christos continue;
590 1.1 christos }
591 1.1 christos if (!decompress) {
592 1.1 christos // If we are compressing and want to write skippable frames we can't
593 1.1 christos // start writing before compression is done because we need to know the
594 1.1 christos // compressed size.
595 1.1 christos // Wait for the compressed size to be available and write skippable frame
596 1.1 christos assert(uint64_t(out->size()) < uint64_t(1) << 32);
597 1.1 christos SkippableFrame frame(uint32_t(out->size()));
598 1.1 christos if (!writeData(frame.data(), outputFd)) {
599 1.1 christos errorHolder.setError("Failed to write output");
600 1.1 christos return bytesWritten;
601 1.1 christos }
602 1.1 christos bytesWritten += frame.kSize;
603 1.1 christos }
604 1.1 christos // For each chunk of the frame: Pop it from the queue and write it
605 1.1 christos Buffer buffer;
606 1.1 christos while (out->pop(buffer) && !errorHolder.hasError()) {
607 1.1 christos if (!writeData(buffer.range(), outputFd)) {
608 1.1 christos errorHolder.setError("Failed to write output");
609 1.1 christos return bytesWritten;
610 1.1 christos }
611 1.1 christos bytesWritten += buffer.size();
612 1.1 christos state.log.update(kLogInfo, "Written: %u MB ",
613 1.1 christos static_cast<std::uint32_t>(bytesWritten >> 20));
614 1.1 christos }
615 1.1 christos }
616 1.1 christos return bytesWritten;
617 1.1 christos }
618 1.1 christos }
619