diff --git a/CMakeLists.txt b/CMakeLists.txt index d683aa3..94f24a0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,7 @@ endif() message("II SOLANACEAE_OBJECT_STORE_STANDALONE " ${SOLANACEAE_OBJECT_STORE_STANDALONE}) #option(SOLANACEAE_OBJECT_STORE_BUILD_PLUGINS "Build the solanaceae_object_store plugins" ${SOLANACEAE_OBJECT_STORE_STANDALONE}) +option(SOLANACEAE_OBJECT_STORE_BUILD_TESTING "Build the solanaceae_object_store tests" ${SOLANACEAE_OBJECT_STORE_STANDALONE}) if (SOLANACEAE_OBJECT_STORE_STANDALONE) set(CMAKE_POSITION_INDEPENDENT_CODE ON) @@ -66,6 +67,11 @@ endif() add_subdirectory(./src) +if (SOLANACEAE_OBJECT_STORE_BUILD_TESTING) + include(CTest) + add_subdirectory(./test) +endif() + #if (SOLANACEAE_OBJECT_STORE_BUILD_PLUGINS) #add_subdirectory(./plugins) #endif() diff --git a/external/CMakeLists.txt b/external/CMakeLists.txt index 7ea89db..a86bb2a 100644 --- a/external/CMakeLists.txt +++ b/external/CMakeLists.txt @@ -28,6 +28,30 @@ if (NOT TARGET nlohmann_json::nlohmann_json) FetchContent_MakeAvailable(json) endif() +if (NOT TARGET zstd::zstd) + # TODO: try find_package() first + # TODO: try pkg-config next (will work on most distros) + + set(ZSTD_BUILD_STATIC ON) + set(ZSTD_BUILD_SHARED OFF) + set(ZSTD_BUILD_PROGRAMS OFF) + set(ZSTD_BUILD_CONTRIB OFF) + set(ZSTD_BUILD_TESTS OFF) + FetchContent_Declare(zstd + URL "https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz" + DOWNLOAD_EXTRACT_TIMESTAMP TRUE + SOURCE_SUBDIR build/cmake + EXCLUDE_FROM_ALL + ) + FetchContent_MakeAvailable(zstd) + + add_library(zstd INTERFACE) # somehow zstd fkd this up + target_include_directories(zstd INTERFACE ${zstd_SOURCE_DIR}/lib/) + target_link_libraries(zstd INTERFACE libzstd_static) + #TODO: new zstd also provides zstd::libzstd + add_library(zstd::zstd ALIAS zstd) +endif() + #if (NOT TARGET solanaceae_plugin) #FetchContent_Declare(solanaceae_plugin #GIT_REPOSITORY https://github.com/Green-Sky/solanaceae_plugin.git diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 727b60f..2eb079b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,3 +19,17 @@ target_link_libraries(solanaceae_object_store PUBLIC solanaceae_util ) +######################################## + +add_library(solanaceae_file2_zstd + ./solanaceae/file/file2_zstd.hpp + ./solanaceae/file/file2_zstd.cpp +) + +target_include_directories(solanaceae_file2_zstd PUBLIC .) +target_compile_features(solanaceae_file2_zstd PUBLIC cxx_std_17) +target_link_libraries(solanaceae_file2_zstd PUBLIC + solanaceae_file2 + zstd::zstd +) + diff --git a/src/solanaceae/file/file2_zstd.cpp b/src/solanaceae/file/file2_zstd.cpp new file mode 100644 index 0000000..f3ba0ab --- /dev/null +++ b/src/solanaceae/file/file2_zstd.cpp @@ -0,0 +1,209 @@ +#include "./file2_zstd.hpp" + +#include +#include +#include +#include + +#include + +File2ZSTDW::File2ZSTDW(File2I& real) : + File2I(true, false), + _real_file(real) +{ + ZSTD_CCtx_setParameter(_cctx.get(), ZSTD_c_compressionLevel, 0); // default (3) + ZSTD_CCtx_setParameter(_cctx.get(), ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?) +} + +File2ZSTDW::~File2ZSTDW(void) { + // flush remaining data (and maybe header) + // actually nvm, write will always flush all data, so only on empty files this would be an issue +} + +bool File2ZSTDW::isGood(void) { + return _real_file.isGood(); +} + +bool File2ZSTDW::write(const ByteSpan data, int64_t pos) { + if (pos != -1) { + return false; + } + + if (data.empty()) { + return false; // return true? + } + + if (data.size < 16) { + std::cout << "F2ZSTD warning: each write is a zstd frame and compression suffers significantly for small frames.\n"; + } + + std::vector compressed_buffer(ZSTD_CStreamOutSize()); + + ZSTD_inBuffer input = { data.ptr, data.size, 0 }; + + size_t remaining_ret {0}; + do { + // remaining data in input < compressed_buffer size (heuristic) + bool const lastChunk = (input.size - input.pos) <= compressed_buffer.size(); + + ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue; + + ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 }; + + remaining_ret = ZSTD_compressStream2(_cctx.get(), &output , &input, mode); + if (ZSTD_isError(remaining_ret)) { + std::cerr << "F2WRZSTD error: compressing data failed\n"; + break; + } + + _real_file.write(ByteSpan{compressed_buffer.data(), output.pos}); + } while ((input.pos < input.size || remaining_ret != 0) && _real_file.isGood()); + + return _real_file.isGood(); +} + +std::variant> File2ZSTDW::read(uint64_t, int64_t) { + return {}; +} + +// ######################################### decompression + +File2ZSTDR::File2ZSTDR(File2I& real) : + File2I(false, true), + _real_file(real), + + // 64kib + _in_buffer(ZSTD_DStreamInSize()), + _out_buffer(ZSTD_DStreamOutSize()) +{ +} + +bool File2ZSTDR::isGood(void) { + return _real_file.isGood(); +} + +bool File2ZSTDR::write(const ByteSpan, int64_t) { + return false; +} + +std::variant> File2ZSTDR::read(uint64_t size, int64_t pos) { + if (pos != -1) { + // error, only support streaming (for now) + return {}; + } + + std::vector ret_data; + + // actually first we check previous data + if (!_decompressed_buffer.empty()) { + uint64_t required_size = std::min(size, _decompressed_buffer.size()); + ret_data.insert(ret_data.end(), _decompressed_buffer.cbegin(), _decompressed_buffer.cbegin() + required_size); + _decompressed_buffer.erase(_decompressed_buffer.cbegin(), _decompressed_buffer.cbegin() + required_size); + } + + bool eof {false}; + // outerloop here + while (ret_data.size() < size && !eof) { + // first make sure we have data in input + if (_z_input.src == nullptr || _z_input.pos == _z_input.size) { + const auto request_size = _in_buffer.size(); + if (!feedInput(_real_file.read(request_size, -1))) { + return ret_data; + } + + // if _z_input.size < _in_buffer.size() -> assume eof? + if (_z_input.size < request_size) { + eof = true; + //std::cout << "---- eof\n"; + } + } + + do { + ZSTD_outBuffer output = { _out_buffer.data(), _out_buffer.size(), 0 }; + size_t const ret = ZSTD_decompressStream(_dctx.get(), &output , &_z_input); + if (ZSTD_isError(ret)) { + // error <.< + std::cerr << "---- error: decompression error\n"; + return ret_data; + } + + // no new decomp data? + if (output.pos == 0) { + if (ret != 0) { + // if not error and not 0, indicates that + // there is additional flushing needed + continue; + } + + assert(eof || ret == 0); + break; + } + + int64_t returning_size = std::min(int64_t(size) - int64_t(ret_data.size()), output.pos); + assert(returning_size >= 0); + if (returning_size > 0) { + ret_data.insert( + ret_data.end(), + reinterpret_cast(output.dst), + reinterpret_cast(output.dst) + returning_size + ); + } + + // make sure we keep excess decompressed data + if (returning_size < int64_t(output.pos)) { + //const auto remaining_size = output.pos - returning_size; + _decompressed_buffer.insert( + _decompressed_buffer.cend(), + reinterpret_cast(output.dst) + returning_size, + reinterpret_cast(output.dst) + output.pos + ); + } + } while (_z_input.pos < _z_input.size); + } + + return ret_data; +} + +bool File2ZSTDR::feedInput(std::variant>&& read_buff) { + // TODO: optimize, we copy the buffer, but we might not need to + if (std::holds_alternative(read_buff)) { + const auto& span = std::get(read_buff); + if (span.size > _in_buffer.size()) { + // error, how did we read more than we asked for?? + return {}; + } + + if (span.empty()) { + _z_input = { _in_buffer.data(), 0, 0 }; + } else { + // cpy + _in_buffer = static_cast>(span); + _z_input = { + _in_buffer.data(), + span.size, + 0 + }; + } + } else if (std::holds_alternative>(read_buff)) { + auto& vec = std::get>(read_buff); + if (vec.size() > _in_buffer.size()) { + // error, how did we read more than we asked for?? + return {}; + } + + // cpy + _in_buffer = vec; + + _z_input = { + _in_buffer.data(), + _in_buffer.size(), + 0 + }; + } else { + // error, unsupported return value of read?? + return false; + } + + return true; +} + diff --git a/src/solanaceae/file/file2_zstd.hpp b/src/solanaceae/file/file2_zstd.hpp new file mode 100644 index 0000000..110b5a3 --- /dev/null +++ b/src/solanaceae/file/file2_zstd.hpp @@ -0,0 +1,51 @@ +#pragma once + +#include + +#include + +#include + +// zstd compression wrapper over another file +// WARNING: only supports sequential writes +struct File2ZSTDW : public File2I { + File2I& _real_file; + + // TODO: hide this detail? + std::unique_ptr _cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx}; + + File2ZSTDW(File2I& real); + virtual ~File2ZSTDW(void); + + bool isGood(void) override; + + // for simplicity and potential future seekability each write is its own frame + bool write(const ByteSpan data, int64_t pos = -1) override; + std::variant> read(uint64_t size, int64_t pos = -1) override; +}; + +// zstd decompression wrapper over another file +// WARNING: only supports sequential reads +// TODO: add seeking support (use frames) +struct File2ZSTDR : public File2I { + File2I& _real_file; + + // TODO: hide this detail? + std::unique_ptr _dctx{ZSTD_createDCtx(), &ZSTD_freeDCtx}; + std::vector _in_buffer; + std::vector _out_buffer; + std::vector _decompressed_buffer; // retains decompressed unread data between read() calls + ZSTD_inBuffer _z_input{nullptr, 0, 0}; + + File2ZSTDR(File2I& real); + virtual ~File2ZSTDR(void) {} + + bool isGood(void) override; + + bool write(const ByteSpan data, int64_t pos = -1) override; + std::variant> read(uint64_t size, int64_t pos = -1) override; + + private: + bool feedInput(std::variant>&& read_buff); +}; + diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt new file mode 100644 index 0000000..5b16ea8 --- /dev/null +++ b/test/CMakeLists.txt @@ -0,0 +1,14 @@ +cmake_minimum_required(VERSION 3.9...3.24 FATAL_ERROR) + +project(solanaceae) + +add_executable(solanaceae_file2_zstd_test + ./test_file_zstd.cpp +) + +target_link_libraries(solanaceae_file2_zstd_test PUBLIC + solanaceae_file2_zstd +) + +add_test(NAME solanaceae_file2_zstd_test COMMAND solanaceae_file2_zstd_test) + diff --git a/test/test_file_zstd.cpp b/test/test_file_zstd.cpp new file mode 100644 index 0000000..fa2389c --- /dev/null +++ b/test/test_file_zstd.cpp @@ -0,0 +1,393 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +const static std::string_view test_text1{"test1 1234 1234 :) 1234 5678 88888888\n"}; +const static ByteSpan data_test_text1{ + reinterpret_cast(test_text1.data()), + test_text1.size() +}; + +const static std::string_view test_text2{"test2 1234 1234 :) 1234 5678 88888888\n"}; +const static ByteSpan data_test_text2{ + reinterpret_cast(test_text2.data()), + test_text2.size() +}; + +const static std::string_view test_text3{ + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" + "00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n" +}; +const static ByteSpan data_test_text3{ + reinterpret_cast(test_text3.data()), + test_text3.size() +}; + +int main(void) { + { // first do a simple mem backed test + std::vector buffer; + { // write + File2MemW f_w_mem{buffer}; + assert(f_w_mem.isGood()); + + File2ZSTDW f_w_zstd{f_w_mem}; + assert(f_w_zstd.isGood()); + + bool res = f_w_zstd.write(data_test_text1); + assert(res); + assert(f_w_zstd.isGood()); + + // write another frame of the same data + res = f_w_zstd.write(data_test_text2); + assert(res); + assert(f_w_zstd.isGood()); + + // write larger frame + res = f_w_zstd.write(data_test_text3); + assert(res); + assert(f_w_zstd.isGood()); + } + + std::cout << "in mem size: " << buffer.size() << "\n"; + + { // read + File2MemR f_r_mem{ByteSpan{buffer}}; + assert(f_r_mem.isGood()); + + File2ZSTDR f_r_zstd{f_r_mem}; + assert(f_r_zstd.isGood()); + + // reads return owning buffers + + { // readback data_test_text1 + auto r_res_var = f_r_zstd.read(data_test_text1.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(r_res_vec.size() == data_test_text1.size); + assert(std::equal(data_test_text1.cbegin(), data_test_text1.cend(), r_res_vec.cbegin())); + } + + { // readback data_test_text2 + auto r_res_var = f_r_zstd.read(data_test_text2.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(r_res_vec.size() == data_test_text2.size); + assert(std::equal( + data_test_text2.cbegin(), + data_test_text2.cend(), + r_res_vec.cbegin() + )); + } + + { // readback data_test_text3 + auto r_res_var = f_r_zstd.read(data_test_text3.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(r_res_vec.size() == data_test_text3.size); + assert(std::equal( + data_test_text3.cbegin(), + data_test_text3.cend(), + r_res_vec.cbegin() + )); + } + + { // assert eof somehow + // since its eof, reading a single byte should return a zero sized buffer + auto r_res_var = f_r_zstd.read(1); + if (std::holds_alternative>(r_res_var)) { + assert(std::get>(r_res_var).empty()); + } else if (std::holds_alternative(r_res_var)) { + assert(std::get(r_res_var).empty()); + } else { + assert(false); + } + } + } + } + + const auto temp_dir = std::filesystem::temp_directory_path() / "file2_zstd_tests"; + + std::filesystem::create_directories(temp_dir); // making sure + assert(std::filesystem::exists(temp_dir)); + std::cout << "test temp dir: " << temp_dir << "\n"; + + const auto test1_file_path = temp_dir / "testfile1.zstd"; + { // simple write test + File2WFile f_w_file{std::string_view{test1_file_path.u8string()}, true}; + assert(f_w_file.isGood()); + + File2ZSTDW f_w_zstd{f_w_file}; + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + + //bool res = f_w_file.write(data_test_text1); + bool res = f_w_zstd.write(data_test_text1); + assert(res); + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + + // write another frame of the same data + res = f_w_zstd.write(data_test_text2); + assert(res); + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + + // write larger frame + res = f_w_zstd.write(data_test_text3); + assert(res); + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + } + + // after flush + assert(std::filesystem::file_size(test1_file_path) != 0); + + { // simple read test (using write test created file) + File2RFile f_r_file{std::string_view{test1_file_path.u8string()}}; + assert(f_r_file.isGood()); + + File2ZSTDR f_r_zstd{f_r_file}; + assert(f_r_zstd.isGood()); + assert(f_r_file.isGood()); + + // reads return owning buffers + + { // readback data_test_text1 + auto r_res_var = f_r_zstd.read(data_test_text1.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(r_res_vec.size() == data_test_text1.size); + assert(std::equal(data_test_text1.cbegin(), data_test_text1.cend(), r_res_vec.cbegin())); + } + + { // readback data_test_text2 + auto r_res_var = f_r_zstd.read(data_test_text2.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(r_res_vec.size() == data_test_text2.size); + assert(std::equal( + data_test_text2.cbegin(), + data_test_text2.cend(), + r_res_vec.cbegin() + )); + } + + { // readback data_test_text3 + auto r_res_var = f_r_zstd.read(data_test_text3.size); + + //assert(f_r_zstd.isGood()); + //assert(f_r_file.isGood()); + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + + //std::cout << "decomp: " << std::string_view{reinterpret_cast(r_res_vec.data()), r_res_vec.size()}; + + assert(r_res_vec.size() == data_test_text3.size); + assert(std::equal( + data_test_text3.cbegin(), + data_test_text3.cend(), + r_res_vec.cbegin() + )); + } + + { // assert eof somehow + // since its eof, reading a single byte should return a zero sized buffer + auto r_res_var = f_r_zstd.read(1); + if (std::holds_alternative>(r_res_var)) { + assert(std::get>(r_res_var).empty()); + } else if (std::holds_alternative(r_res_var)) { + assert(std::get(r_res_var).empty()); + } else { + assert(false); + } + } + } + + const auto test2_file_path = temp_dir / "testfile2.zstd"; + { // write and read a single frame with increasing size + for (size_t fslog = 1; fslog <= 25; fslog++) { + const size_t frame_size = 1< tmp_data(frame_size); + for (auto& e : tmp_data) { + e = uint8_t(rng_data() & 0xff); // cutoff bad but good enough + } + assert(tmp_data.size() == frame_size); + + bool res = f_w_zstd.write(ByteSpan{tmp_data}); + assert(res); + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + } + + { // read + std::minstd_rand rng_data{11*1337}; + + File2RFile f_r_file{std::string_view{test2_file_path.u8string()}}; + assert(f_r_file.isGood()); + + File2ZSTDR f_r_zstd{f_r_file}; + assert(f_r_zstd.isGood()); + assert(f_r_file.isGood()); + + { // read frame + auto r_res_var = f_r_zstd.read(frame_size); + + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + assert(r_res_vec.size() == frame_size); + + // assert equal + for (auto& e : r_res_vec) { + assert(e == uint8_t(rng_data() & 0xff)); + } + } + + { // eof test + auto r_res_var = f_r_zstd.read(1); + if (std::holds_alternative>(r_res_var)) { + assert(std::get>(r_res_var).empty()); + } else if (std::holds_alternative(r_res_var)) { + assert(std::get(r_res_var).empty()); + } else { + assert(false); + } + } + } + + // since we spam file, we immediatly remove them + std::filesystem::remove(test2_file_path); + } + } + + const auto test3_file_path = temp_dir / "testfile3.zstd"; + { // large file test write + File2WFile f_w_file{std::string_view{test3_file_path.u8string()}, true}; + assert(f_w_file.isGood()); + + File2ZSTDW f_w_zstd{f_w_file}; + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + + std::minstd_rand rng{11*1337}; + std::minstd_rand rng_data{11*1337}; // make investigating easier + + size_t total_raw_size {0}; + for (size_t i = 0; i < 2000; i++) { + const size_t frame_size = (rng() % ((2<<19) - 1)) + 1; + + std::vector tmp_data(frame_size); + for (auto& e : tmp_data) { + e = uint8_t(rng_data() & 0xff); // cutoff bad but good enough + } + + bool res = f_w_zstd.write(ByteSpan{tmp_data}); + assert(res); + assert(f_w_zstd.isGood()); + assert(f_w_file.isGood()); + total_raw_size += frame_size; + } + std::cout << "t3 total raw size: " << total_raw_size << "\n"; + } + + // after flush + std::cout << "t3 size on disk: " << std::filesystem::file_size(test3_file_path) << "\n"; + + { // large file test read + File2RFile f_r_file{std::string_view{test3_file_path.u8string()}}; + assert(f_r_file.isGood()); + + File2ZSTDR f_r_zstd{f_r_file}; + assert(f_r_zstd.isGood()); + assert(f_r_file.isGood()); + + // using same rng state as write to compare + std::minstd_rand rng{11*1337}; + std::minstd_rand rng_data{11*1337}; + + for (size_t i = 0; i < 2000; i++) { + const size_t frame_size = (rng() % ((2<<19) - 1)) + 1; + //std::cerr << "f: " << i << " fs: " << frame_size << "\n"; + + auto r_res_var = f_r_zstd.read(frame_size); + + assert(std::holds_alternative>(r_res_var)); + const auto& r_res_vec = std::get>(r_res_var); + assert(r_res_vec.size() == frame_size); + + // assert equal + for (auto& e : r_res_vec) { + assert(e == uint8_t(rng_data() & 0xff)); + } + } + + { // eof test + auto r_res_var = f_r_zstd.read(1); + if (std::holds_alternative>(r_res_var)) { + assert(std::get>(r_res_var).empty()); + } else if (std::holds_alternative(r_res_var)) { + assert(std::get(r_res_var).empty()); + } else { + assert(false); + } + } + } + + // cleanup + std::filesystem::remove_all(temp_dir); +} +