diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c26a40f2..28c7938c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -62,11 +62,11 @@ target_link_libraries(message_fragment_store PUBLIC ######################################## -add_executable(fragment_store_test +add_executable(test_fragment_store fragment_store/test_fragstore.cpp ) -target_link_libraries(fragment_store_test PUBLIC +target_link_libraries(test_fragment_store PUBLIC fragment_store ) diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index 450a016d..66d6f049 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -8,6 +8,9 @@ #include +#include +#include "./file2_zstd.hpp" + #include #include @@ -18,9 +21,12 @@ #include #include #include +#include +#include +#include +#include #include -#include static const char* metaFileTypeSuffix(MetaFileType mft) { switch (mft) { @@ -125,7 +131,7 @@ FragmentID FragmentStore::newFragmentFile( _reg.emplace(new_frag, mft); // meta needs to be synced to file - std::function empty_data_cb = [](const uint8_t*, uint64_t) -> uint64_t { return 0; }; + std::function empty_data_cb = [](auto*, auto) -> uint64_t { return 0; }; if (!syncToStorage(new_frag, empty_data_cb)) { std::cerr << "FS error: syncToStorage failed while creating new fragment file\n"; _reg.destroy(new_frag); @@ -421,72 +427,102 @@ bool FragmentStore::loadFromStorage(FragmentID fid, std::function> data_file_stack; + data_file_stack.push(std::make_unique(std::string_view{frag_path})); - if (!data_file.is_open()) { + //std::ifstream data_file{ + //frag_path, + //std::ios::in | std::ios::binary // always binary, also for text + //}; + + if (!data_file_stack.top()->isGood()) { std::cerr << "FS error: fragment data file failed to open '" << frag_path << "'\n"; // error return false; } + // TODO: decrypt here + Compression data_comp = Compression::NONE; if (_reg.all_of(fid)) { data_comp = _reg.get(fid).comp; } - if (data_comp == Compression::NONE) { - std::array buffer; - uint64_t buffer_actual_size {0}; - do { - data_file.read(reinterpret_cast(buffer.data()), buffer.size()); - buffer_actual_size = data_file.gcount(); - - if (buffer_actual_size == 0) { - break; - } - - data_cb(buffer.data(), buffer_actual_size); - } while (buffer_actual_size == buffer.size() && !data_file.eof()); - } else if (data_comp == Compression::ZSTD) { - std::vector in_buffer(ZSTD_DStreamInSize()); - std::vector out_buffer(ZSTD_DStreamOutSize()); - std::unique_ptr dctx{ZSTD_createDCtx(), &ZSTD_freeDCtx}; - - uint64_t buffer_actual_size {0}; - do { - data_file.read(reinterpret_cast(in_buffer.data()), in_buffer.size()); - buffer_actual_size = data_file.gcount(); - if (buffer_actual_size == 0) { - break; - } - - ZSTD_inBuffer input {in_buffer.data(), buffer_actual_size, 0 }; - do { - ZSTD_outBuffer output = { out_buffer.data(), out_buffer.size(), 0 }; - size_t const ret = ZSTD_decompressStream(dctx.get(), &output , &input); - if (ZSTD_isError(ret)) { - // error <.< - std::cerr << "FS error: decompression error\n"; - break; - } - - data_cb(out_buffer.data(), output.pos); - } while (input.pos < input.size); - } while (buffer_actual_size == in_buffer.size() && !data_file.eof()); + // add layer based on enum + if (data_comp == Compression::ZSTD) { + data_file_stack.push(std::make_unique(*data_file_stack.top().get())); + if (!data_file_stack.top()->isGood()) { + std::cerr << "FS error: fragment data file failed to add zstd decompression layer '" << frag_path << "'\n"; + // error + return false; + } } else { - assert(false && "implement me"); + assert(data_comp == Compression::NONE); } + //if (data_comp == Compression::NONE) { + static constexpr int64_t chunk_size {1024 * 1024}; + do { + auto data_var = data_file_stack.top()->read(chunk_size); + ByteSpan data; + if (std::holds_alternative>(data_var)) { + auto& vec = std::get>(data_var); + data = {vec.data(), vec.size()}; + } else if (std::holds_alternative(data_var)) { + data = std::get(data_var); + } else { + assert(false); + } + + if (data.empty()) { + // error or probably eof + break; + } + + data_cb(data); + + if (data.size < chunk_size) { + // eof + break; + } + } while (data_file_stack.top()->isGood()); + //} else if (data_comp == Compression::ZSTD) { + //std::vector in_buffer(ZSTD_DStreamInSize()); + //std::vector out_buffer(ZSTD_DStreamOutSize()); + //std::unique_ptr dctx{ZSTD_createDCtx(), &ZSTD_freeDCtx}; + + //uint64_t buffer_actual_size {0}; + //do { + //data_file.read(reinterpret_cast(in_buffer.data()), in_buffer.size()); + //buffer_actual_size = data_file.gcount(); + //if (buffer_actual_size == 0) { + //break; + //} + + //ZSTD_inBuffer input {in_buffer.data(), buffer_actual_size, 0 }; + //do { + //ZSTD_outBuffer output = { out_buffer.data(), out_buffer.size(), 0 }; + //size_t const ret = ZSTD_decompressStream(dctx.get(), &output , &input); + //if (ZSTD_isError(ret)) { + //// error <.< + //std::cerr << "FS error: decompression error\n"; + //break; + //} + + //data_cb(out_buffer.data(), output.pos); + //} while (input.pos < input.size); + //} while (buffer_actual_size == in_buffer.size() && !data_file.eof()); + //} else { + //assert(false && "implement me"); + //} + return true; } nlohmann::json FragmentStore::loadFromStorageNJ(FragmentID fid) { std::vector tmp_buffer; - std::function cb = [&tmp_buffer](const uint8_t* buffer, const uint64_t buffer_size) { - tmp_buffer.insert(tmp_buffer.end(), buffer, buffer+buffer_size); + std::function cb = [&tmp_buffer](const ByteSpan buffer) { + tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend()); }; if (!loadFromStorage(fid, cb)) { diff --git a/src/fragment_store/fragment_store.hpp b/src/fragment_store/fragment_store.hpp index 26c52040..d29bff60 100644 --- a/src/fragment_store/fragment_store.hpp +++ b/src/fragment_store/fragment_store.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include "./fragment_store_i.hpp" #include "./types.hpp" @@ -80,7 +82,7 @@ struct FragmentStore : public FragmentStoreI { bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size); // ========== load fragment data from storage ========== - using read_from_storage_put_data_cb = void(const uint8_t* buffer, const uint64_t buffer_size); + using read_from_storage_put_data_cb = void(const ByteSpan buffer); bool loadFromStorage(FragmentID fid, std::function& data_cb); // convenience function nlohmann::json loadFromStorageNJ(FragmentID fid); diff --git a/src/fragment_store/test_fragstore.cpp b/src/fragment_store/test_fragstore.cpp index 7136643d..1319e29c 100644 --- a/src/fragment_store/test_fragstore.cpp +++ b/src/fragment_store/test_fragstore.cpp @@ -50,7 +50,7 @@ int main(void) { { auto frag1h = fs.fragmentHandle(frag1); - frag1h.emplace_or_replace(); + frag1h.emplace_or_replace().comp = Compression::ZSTD; frag1h.emplace_or_replace(); std::function fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t { diff --git a/src/fragment_store/uuid_generator.hpp b/src/fragment_store/uuid_generator.hpp index 755e3be7..b0a1f999 100644 --- a/src/fragment_store/uuid_generator.hpp +++ b/src/fragment_store/uuid_generator.hpp @@ -9,6 +9,7 @@ struct UUIDGeneratorI { virtual std::vector operator()(void) = 0; }; +// TODO: templates? struct UUIDGenerator_128_128 final : public UUIDGeneratorI { private: std::array _uuid_namespace;