From def7fc1959811753177b9deb28fa97e2724dedcb Mon Sep 17 00:00:00 2001 From: Green Sky Date: Thu, 28 Mar 2024 15:36:14 +0100 Subject: [PATCH] add file2 impl for zstd (lightly tested and not integrated yet) --- src/CMakeLists.txt | 14 ++ src/fragment_store/file2_zstd.cpp | 200 ++++++++++++++++++++++++++ src/fragment_store/file2_zstd.hpp | 51 +++++++ src/fragment_store/test_file_zstd.cpp | 151 +++++++++++++++++++ 4 files changed, 416 insertions(+) create mode 100644 src/fragment_store/file2_zstd.cpp create mode 100644 src/fragment_store/file2_zstd.hpp create mode 100644 src/fragment_store/test_file_zstd.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4c85e701..ea5ef872 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,9 @@ cmake_minimum_required(VERSION 3.9 FATAL_ERROR) add_library(fragment_store + ./fragment_store/file2_zstd.hpp + ./fragment_store/file2_zstd.cpp + ./fragment_store/fragment_store_i.hpp ./fragment_store/fragment_store_i.cpp ./fragment_store/types.hpp @@ -19,6 +22,7 @@ target_link_libraries(fragment_store PUBLIC EnTT::EnTT solanaceae_util + solanaceae_file2 zstd::zstd solanaceae_tox_messages # TODO: move @@ -26,6 +30,16 @@ target_link_libraries(fragment_store PUBLIC ######################################## +add_executable(test_file_zstd + fragment_store/test_file_zstd.cpp +) + +target_link_libraries(test_file_zstd PUBLIC + fragment_store +) + +######################################## + add_library(message_fragment_store ./fragment_store/message_serializer.hpp ./fragment_store/message_serializer.cpp diff --git a/src/fragment_store/file2_zstd.cpp b/src/fragment_store/file2_zstd.cpp new file mode 100644 index 00000000..f43e2fde --- /dev/null +++ b/src/fragment_store/file2_zstd.cpp @@ -0,0 +1,200 @@ +#include "./file2_zstd.hpp" + +#include +#include +#include +#include + +#include + +File2ZSTDW::File2ZSTDW(File2I& real) : + File2I(true, false), + _real_file(real) +{ + ZSTD_CCtx_setParameter(_cctx.get(), ZSTD_c_compressionLevel, 0); // default (3) + ZSTD_CCtx_setParameter(_cctx.get(), ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?) +} + +bool File2ZSTDW::isGood(void) { + return _real_file.isGood(); +} + +bool File2ZSTDW::write(const ByteSpan data, int64_t pos) { + if (pos != -1) { + return false; + } + + if (data.empty()) { + return false; // return true? + } + + if (data.size < 16) { + std::cout << "F2ZSTD warning: each write is a zstd frame and compression suffers significantly for small frames.\n"; + } + + std::vector compressed_buffer(ZSTD_CStreamOutSize()); + + ZSTD_inBuffer input = { data.ptr, data.size, 0 }; + + size_t remaining_ret {0}; + do { + // remaining data in input < compressed_buffer size (heuristic) + bool const lastChunk = (input.size - input.pos) <= compressed_buffer.size(); + + ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue; + + ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 }; + + remaining_ret = ZSTD_compressStream2(_cctx.get(), &output , &input, mode); + if (ZSTD_isError(remaining_ret)) { + std::cerr << "F2WRZSTD error: compressing data failed\n"; + break; + } + + _real_file.write(ByteSpan{compressed_buffer.data(), output.pos}); + } while (input.pos < input.size && remaining_ret != 0 && _real_file.isGood()); + + return _real_file.isGood(); +} + +std::variant> File2ZSTDW::read(uint64_t, int64_t) { + return {}; +} + +// ######################################### decompression + +File2ZSTDR::File2ZSTDR(File2I& real) : + File2I(false, true), + _real_file(real), + + // 64kib + _in_buffer(ZSTD_DStreamInSize()), + _out_buffer(ZSTD_DStreamOutSize()) +{ +} + +bool File2ZSTDR::isGood(void) { + return _real_file.isGood(); +} + +bool File2ZSTDR::write(const ByteSpan, int64_t) { + return false; +} + +std::variant> File2ZSTDR::read(uint64_t size, int64_t pos) { + if (pos != -1) { + // error, only support streaming (for now) + return {}; + } + + std::vector ret_data; + + // actually first we check previous data + if (!_decompressed_buffer.empty()) { + uint64_t required_size = std::min(size, _decompressed_buffer.size()); + ret_data.insert(ret_data.end(), _decompressed_buffer.cbegin(), _decompressed_buffer.cbegin() + required_size); + _decompressed_buffer.erase(_decompressed_buffer.cbegin(), _decompressed_buffer.cbegin() + required_size); + } + + bool eof {false}; + // outerloop here + while (ret_data.size() < size && !eof) { + // first make sure we have data in input + if (_z_input.src == nullptr || _z_input.pos == _z_input.size) { + const auto request_size = _in_buffer.size(); + if (!feedInput(_real_file.read(request_size, -1))) { + return ret_data; + } + std::cout << "---- fed input " << _z_input.size << "bytes\n"; + // if _z_input.size < _in_buffer.size() -> assume eof? + if (_z_input.size < request_size) { + eof = true; + std::cout << "---- eof\n"; + } + } + + do { + ZSTD_outBuffer output = { _out_buffer.data(), _out_buffer.size(), 0 }; + size_t const ret = ZSTD_decompressStream(_dctx.get(), &output , &_z_input); + if (ZSTD_isError(ret)) { + // error <.< + std::cerr << "---- error: decompression error\n"; + return ret_data; + } + + // no new decomp data? + if (output.pos == 0) { + assert(eof || ret == 0); + break; + } + + int64_t returning_size = std::min(int64_t(size) - int64_t(ret_data.size()), output.pos); + assert(returning_size >= 0); + if (returning_size > 0) { + ret_data.insert( + ret_data.end(), + reinterpret_cast(output.dst), + reinterpret_cast(output.dst) + returning_size + ); + } + + // make sure we keep excess decompressed data + if (returning_size < int64_t(output.pos)) { + //const auto remaining_size = output.pos - returning_size; + _decompressed_buffer.insert( + _decompressed_buffer.cend(), + reinterpret_cast(output.dst) + returning_size, + reinterpret_cast(output.dst) + output.pos + ); + } + } while (_z_input.pos < _z_input.size); + } + + return ret_data; +} + +bool File2ZSTDR::feedInput(std::variant>&& read_buff) { + // TODO: optimize, we copy the buffer, but we might not need to + if (std::holds_alternative(read_buff)) { + const auto& span = std::get(read_buff); + std::cout << "---- feedInput got span " << span.size << "\n"; + if (span.size > _in_buffer.size()) { + // error, how did we read more than we asked for?? + return {}; + } + + if (span.empty()) { + _z_input = { _in_buffer.data(), 0, 0 }; + } else { + // cpy + _in_buffer = static_cast>(span); + _z_input = { + _in_buffer.data(), + span.size, + 0 + }; + } + } else if (std::holds_alternative>(read_buff)) { + auto& vec = std::get>(read_buff); + std::cout << "---- feedInput got vec " << vec.size() << "\n"; + if (vec.size() > _in_buffer.size()) { + // error, how did we read more than we asked for?? + return {}; + } + + // cpy + _in_buffer = vec; + + _z_input = { + _in_buffer.data(), + _in_buffer.size(), + 0 + }; + } else { + // error, unsupported return value of read?? + return false; + } + + return true; +} + diff --git a/src/fragment_store/file2_zstd.hpp b/src/fragment_store/file2_zstd.hpp new file mode 100644 index 00000000..7646374a --- /dev/null +++ b/src/fragment_store/file2_zstd.hpp @@ -0,0 +1,51 @@ +#pragma once + +#include + +#include + +#include + +// zstd compression wrapper over another file +// WARNING: only supports sequential writes +struct File2ZSTDW : public File2I { + File2I& _real_file; + + // TODO: hide this detail? + std::unique_ptr _cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx}; + + File2ZSTDW(File2I& real); + virtual ~File2ZSTDW(void) {} + + bool isGood(void) override; + + // for simplicity and potential future seekability each write is its own frame + bool write(const ByteSpan data, int64_t pos = -1) override; + std::variant> read(uint64_t size, int64_t pos = -1) override; +}; + +// zstd decompression wrapper over another file +// WARNING: only supports sequential reads +// TODO: add seeking support (use frames) +struct File2ZSTDR : public File2I { + File2I& _real_file; + + // TODO: hide this detail? + std::unique_ptr _dctx{ZSTD_createDCtx(), &ZSTD_freeDCtx}; + std::vector _in_buffer; + std::vector _out_buffer; + std::vector _decompressed_buffer; // retains decompressed unread data between read() calls + ZSTD_inBuffer _z_input{nullptr, 0, 0}; + + File2ZSTDR(File2I& real); + virtual ~File2ZSTDR(void) {} + + bool isGood(void) override; + + bool write(const ByteSpan data, int64_t pos = -1) override; + std::variant> read(uint64_t size, int64_t pos = -1) override; + + private: + bool feedInput(std::variant>&& read_buff); +}; + diff --git a/src/fragment_store/test_file_zstd.cpp b/src/fragment_store/test_file_zstd.cpp new file mode 100644 index 00000000..b5a13691 --- /dev/null +++ b/src/fragment_store/test_file_zstd.cpp @@ -0,0 +1,151 @@ +#include "./file2_zstd.hpp" + +#include +#include + +#include +#include +#include +#include +#include + +const static std::string_view test_text1{"test1 1234 1234 :) 1234 5678 88888888\n"}; +const static ByteSpan data_test_text1{ + reinterpret_cast(test_text1.data()), + test_text1.size() +}; + +const static std::string_view test_text2{"test2 1234 1234 :) 1234 5678 88888888\n"}; +const static ByteSpan data_test_text2{ + reinterpret_cast(test_text2.data()), + test_text2.size() +}; + +const static std::string_view test_text3{ + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" +}; +const static ByteSpan data_test_text3{ + reinterpret_cast(test_text3.data()), + test_text3.size() +}; + +int main(void) { + const auto temp_dir = std::filesystem::temp_directory_path() / "file2wzstdtests"; + + std::filesystem::create_directories(temp_dir); // making sure + assert(std::filesystem::exists(temp_dir)); + std::cout << "test temp dir: " << temp_dir << "\n"; + + const auto test1_file_path = temp_dir / "testfile1.zstd"; + { // simple write test + File2WFile f_w_file{test1_file_path.c_str(), true}; + assert(f_w_file.isGood()); + + File2ZSTDW f_w_zstd{f_w_file}; + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + + //bool res = f_w_file.write(data_test_text1); + bool res = f_w_zstd.write(data_test_text1); + assert(res); + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + + // write another frame of the same data + res = f_w_zstd.write(data_test_text2); + assert(res); + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + + // write larger frame + res = f_w_zstd.write(data_test_text3); + assert(res); + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + } + + // after flush + assert(std::filesystem::file_size(test1_file_path) != 0); + + { // simple read test (using write test created file) + File2RFile f_r_file{test1_file_path.c_str()}; + assert(f_r_file.isGood()); + + File2ZSTDR f_r_zstd{f_r_file}; + assert(f_r_zstd.isGood()); + assert(f_r_file.isGood()); + + // reads return owning buffers + + { // readback data_test_text1 + auto r_res_var = f_r_zstd.read(data_test_text1.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(std::get>(r_res_var).size() == data_test_text1.size); + assert(std::equal(data_test_text1.cbegin(), data_test_text1.cend(), std::get>(r_res_var).cbegin())); + } + + { // readback data_test_text2 + auto r_res_var = f_r_zstd.read(data_test_text2.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(std::get>(r_res_var).size() == data_test_text2.size); + assert(std::equal( + data_test_text2.cbegin(), + data_test_text2.cend(), + std::get>(r_res_var).cbegin() + )); + } + + { // readback data_test_text3 + auto r_res_var = f_r_zstd.read(data_test_text3.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(std::get>(r_res_var).size() == data_test_text3.size); + assert(std::equal( + data_test_text3.cbegin(), + data_test_text3.cend(), + r_res_vec.cbegin() + )); + } + + { // assert eof somehow + // since its eof, reading a single byte should return a zero sized buffer + auto r_res_var = f_r_zstd.read(1); + if (std::holds_alternative>(r_res_var)) { + assert(std::get>(r_res_var).empty()); + } else if (std::holds_alternative(r_res_var)) { + assert(std::get(r_res_var).empty()); + } else { + assert(false); + } + } + } + + // cleanup + std::filesystem::remove_all(temp_dir); +} +