From 4fb2b51b7d93bc7bc95bf8edd4d710ce63231e24 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sun, 18 Feb 2024 18:19:49 +0100 Subject: [PATCH] switch to streaming compressor for data to drastically improve ratio. would still benefit from a abstract file refactor --- src/fragment_store/fragment_store.cpp | 74 ++++++++++++++++++++------- 1 file changed, 55 insertions(+), 19 deletions(-) diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index 4f705a2..6b4b40e 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -340,6 +340,10 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function buffer; - uint64_t buffer_actual_size {0}; - do { - buffer_actual_size = data_cb(buffer.data(), buffer.size()); - if (buffer_actual_size == 0) { - break; - } - if (buffer_actual_size > buffer.size()) { - // wtf - break; - } + if (data_comp == Compression::NONE) { + std::array buffer; + uint64_t buffer_actual_size {0}; + do { + buffer_actual_size = data_cb(buffer.data(), buffer.size()); + if (buffer_actual_size == 0) { + break; + } + if (buffer_actual_size > buffer.size()) { + // wtf + break; + } - if (data_comp == Compression::NONE) { data_file.write(reinterpret_cast(buffer.data()), buffer_actual_size); - } else if (data_comp == Compression::ZSTD) { - std::vector compressed_buffer; - compressed_buffer.resize(ZSTD_compressBound(buffer_actual_size)); + } while (buffer_actual_size == buffer.size()); + } else if (data_comp == Compression::ZSTD) { + std::vector buffer(ZSTD_CStreamInSize()); + std::vector compressed_buffer(ZSTD_CStreamOutSize()); + uint64_t buffer_actual_size {0}; - size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), buffer.data(), buffer_actual_size, 0); // 0 is default is probably 3 + ZSTD_CCtx* const cctx = ZSTD_createCCtx(); + ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 0); // default (3) + ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?) + do { + buffer_actual_size = data_cb(buffer.data(), buffer.size()); + //if (buffer_actual_size == 0) { + //break; + //} + if (buffer_actual_size > buffer.size()) { + // wtf + break; + } + bool const lastChunk = (buffer_actual_size < buffer.size()); - data_file.write(reinterpret_cast(compressed_buffer.data()), cSize); - } - } while (buffer_actual_size == buffer.size()); + ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue; + ZSTD_inBuffer input = { buffer.data(), buffer_actual_size, 0 }; + + while (input.pos < input.size) { + ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 }; + + size_t const remaining = ZSTD_compressStream2(cctx, &output , &input, mode); + if (ZSTD_isError(remaining)) { + std::cerr << "FS error: compressing data failed\n"; + break; + } + + data_file.write(reinterpret_cast(compressed_buffer.data()), output.pos); + + if (remaining == 0) { + break; + } + } + // same as if lastChunk break; + } while (buffer_actual_size == buffer.size()); + } meta_file.flush(); data_file.flush();