From 6f511016bc23d140bdc0dc2c9f14260383964748 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sun, 18 Feb 2024 02:40:17 +0100 Subject: [PATCH] save msg json zstd compressed (3x compression) --- external/CMakeLists.txt | 5 +- src/CMakeLists.txt | 2 + src/fragment_store/fragment_store.cpp | 89 ++++++++++++++++--- src/fragment_store/message_fragment_store.cpp | 2 + src/fragment_store/types.hpp | 1 + 5 files changed, 86 insertions(+), 13 deletions(-) diff --git a/external/CMakeLists.txt b/external/CMakeLists.txt index cbcdf2c..a7066be 100644 --- a/external/CMakeLists.txt +++ b/external/CMakeLists.txt @@ -45,5 +45,8 @@ if (NOT TARGET zstd::zstd) ) FetchContent_MakeAvailable(zstd) - add_library(zstd::zstd ALIAS libzstd_static) + add_library(zstd INTERFACE) # somehow zstd fkd this up + target_include_directories(zstd INTERFACE ${zstd_SOURCE_DIR}/lib/) + target_link_libraries(zstd INTERFACE libzstd_static) + add_library(zstd::zstd ALIAS zstd) endif() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b70c3e9..4c85e70 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,6 +19,8 @@ target_link_libraries(fragment_store PUBLIC EnTT::EnTT solanaceae_util + zstd::zstd + solanaceae_tox_messages # TODO: move ) diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index 1776f27..4f705a2 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -8,6 +8,8 @@ #include +#include + #include #include #include @@ -282,6 +284,11 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid)) { + data_comp = _reg.get(fid).comp; + } + std::ofstream data_file{ _reg.get(fid).path, std::ios::out | std::ios::trunc | std::ios::binary // always binary, also for text @@ -324,8 +331,22 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(res.data()), res.size()); + + // TODO: refactor + if (meta_comp == Compression::NONE) { + meta_file.write(reinterpret_cast(res.data()), res.size()); + } else if (meta_comp == Compression::ZSTD) { + std::vector compressed_buffer; + compressed_buffer.resize(ZSTD_compressBound(res.size())); + + size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), res.data(), res.size(), 0); // 0 is default is probably 3 + + compressed_buffer.resize(cSize); // maybe skip this resize + + meta_file.write(reinterpret_cast(compressed_buffer.data()), compressed_buffer.size()); + } } else if (meta_type == MetaFileType::TEXT_JSON) { + // cant be compressed or encrypted meta_file << meta_data.dump(2, ' ', true); } @@ -342,7 +363,16 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(buffer.data()), buffer_actual_size); + if (data_comp == Compression::NONE) { + data_file.write(reinterpret_cast(buffer.data()), buffer_actual_size); + } else if (data_comp == Compression::ZSTD) { + std::vector compressed_buffer; + compressed_buffer.resize(ZSTD_compressBound(buffer_actual_size)); + + size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), buffer.data(), buffer_actual_size, 0); // 0 is default is probably 3 + + data_file.write(reinterpret_cast(compressed_buffer.data()), cSize); + } } while (buffer_actual_size == buffer.size()); meta_file.flush(); @@ -399,18 +429,53 @@ bool FragmentStore::loadFromStorage(FragmentID fid, std::function buffer; - uint64_t buffer_actual_size {0}; - do { - data_file.read(reinterpret_cast(buffer.data()), buffer.size()); - buffer_actual_size = data_file.gcount(); + Compression data_comp = Compression::NONE; + if (_reg.all_of(fid)) { + data_comp = _reg.get(fid).comp; + } - if (buffer_actual_size == 0) { - break; - } + if (data_comp == Compression::NONE) { + std::array buffer; + uint64_t buffer_actual_size {0}; + do { + data_file.read(reinterpret_cast(buffer.data()), buffer.size()); + buffer_actual_size = data_file.gcount(); - data_cb(buffer.data(), buffer_actual_size); - } while (buffer_actual_size == buffer.size() && !data_file.eof()); + if (buffer_actual_size == 0) { + break; + } + + data_cb(buffer.data(), buffer_actual_size); + } while (buffer_actual_size == buffer.size() && !data_file.eof()); + } else if (data_comp == Compression::ZSTD) { + std::vector in_buffer(ZSTD_DStreamInSize()); + std::vector out_buffer(ZSTD_DStreamOutSize()); + ZSTD_DCtx* const dctx = ZSTD_createDCtx(); + + uint64_t buffer_actual_size {0}; + do { + data_file.read(reinterpret_cast(in_buffer.data()), in_buffer.size()); + buffer_actual_size = data_file.gcount(); + if (buffer_actual_size == 0) { + break; + } + + ZSTD_inBuffer input {in_buffer.data(), buffer_actual_size, 0 }; + do { + ZSTD_outBuffer output = { out_buffer.data(), out_buffer.size(), 0 }; + size_t const ret = ZSTD_decompressStream(dctx, &output , &input); + if (ZSTD_isError(ret)) { + // error <.< + std::cerr << "FS error: decompression error\n"; + break; + } + + data_cb(out_buffer.data(), output.pos); + } while (input.pos < input.size); + } while (buffer_actual_size == in_buffer.size() && !data_file.eof()); + + ZSTD_freeDCtx(dctx); + } return true; } diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index 761a256..b0b4213 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -154,6 +154,8 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { fragment_uid = fh.get().v; + fh.emplace_or_replace().comp = Compression::ZSTD; + auto& new_ts_range = fh.emplace(); new_ts_range.begin = msg_ts; new_ts_range.end = msg_ts; diff --git a/src/fragment_store/types.hpp b/src/fragment_store/types.hpp index cbb64c5..c259bd9 100644 --- a/src/fragment_store/types.hpp +++ b/src/fragment_store/types.hpp @@ -7,6 +7,7 @@ enum class Encryption : uint8_t { }; enum class Compression : uint8_t { NONE = 0x00, + ZSTD = 0x01, }; enum class MetaFileType : uint8_t { TEXT_JSON,