use file2 zstd wrapper to read frag data (untested)

This commit is contained in:
Green Sky 2024-03-31 15:43:01 +02:00
parent 19fd99f713
commit 19844a9423
No known key found for this signature in database
5 changed files with 92 additions and 53 deletions

View File

@ -62,11 +62,11 @@ target_link_libraries(message_fragment_store PUBLIC
######################################## ########################################
add_executable(fragment_store_test add_executable(test_fragment_store
fragment_store/test_fragstore.cpp fragment_store/test_fragstore.cpp
) )
target_link_libraries(fragment_store_test PUBLIC target_link_libraries(test_fragment_store PUBLIC
fragment_store fragment_store
) )

View File

@ -8,6 +8,9 @@
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <solanaceae/file/file2_std.hpp>
#include "./file2_zstd.hpp"
#include <zstd.h> #include <zstd.h>
#include <cstdint> #include <cstdint>
@ -18,9 +21,12 @@
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#include <algorithm> #include <algorithm>
#include <stack>
#include <string_view>
#include <vector>
#include <variant>
#include <iostream> #include <iostream>
#include <vector>
static const char* metaFileTypeSuffix(MetaFileType mft) { static const char* metaFileTypeSuffix(MetaFileType mft) {
switch (mft) { switch (mft) {
@ -125,7 +131,7 @@ FragmentID FragmentStore::newFragmentFile(
_reg.emplace<FragComp::Ephemeral::MetaFileType>(new_frag, mft); _reg.emplace<FragComp::Ephemeral::MetaFileType>(new_frag, mft);
// meta needs to be synced to file // meta needs to be synced to file
std::function<write_to_storage_fetch_data_cb> empty_data_cb = [](const uint8_t*, uint64_t) -> uint64_t { return 0; }; std::function<write_to_storage_fetch_data_cb> empty_data_cb = [](auto*, auto) -> uint64_t { return 0; };
if (!syncToStorage(new_frag, empty_data_cb)) { if (!syncToStorage(new_frag, empty_data_cb)) {
std::cerr << "FS error: syncToStorage failed while creating new fragment file\n"; std::cerr << "FS error: syncToStorage failed while creating new fragment file\n";
_reg.destroy(new_frag); _reg.destroy(new_frag);
@ -421,72 +427,102 @@ bool FragmentStore::loadFromStorage(FragmentID fid, std::function<read_from_stor
std::cout << "FS: loading fragment '" << frag_path << "'\n"; std::cout << "FS: loading fragment '" << frag_path << "'\n";
std::ifstream data_file{ std::stack<std::unique_ptr<File2I>> data_file_stack;
frag_path, data_file_stack.push(std::make_unique<File2RFile>(std::string_view{frag_path}));
std::ios::in | std::ios::binary // always binary, also for text
};
if (!data_file.is_open()) { //std::ifstream data_file{
//frag_path,
//std::ios::in | std::ios::binary // always binary, also for text
//};
if (!data_file_stack.top()->isGood()) {
std::cerr << "FS error: fragment data file failed to open '" << frag_path << "'\n"; std::cerr << "FS error: fragment data file failed to open '" << frag_path << "'\n";
// error // error
return false; return false;
} }
// TODO: decrypt here
Compression data_comp = Compression::NONE; Compression data_comp = Compression::NONE;
if (_reg.all_of<FragComp::DataCompressionType>(fid)) { if (_reg.all_of<FragComp::DataCompressionType>(fid)) {
data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp; data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp;
} }
if (data_comp == Compression::NONE) { // add layer based on enum
std::array<uint8_t, 1024> buffer; if (data_comp == Compression::ZSTD) {
uint64_t buffer_actual_size {0}; data_file_stack.push(std::make_unique<File2ZSTDR>(*data_file_stack.top().get()));
do { if (!data_file_stack.top()->isGood()) {
data_file.read(reinterpret_cast<char*>(buffer.data()), buffer.size()); std::cerr << "FS error: fragment data file failed to add zstd decompression layer '" << frag_path << "'\n";
buffer_actual_size = data_file.gcount(); // error
return false;
if (buffer_actual_size == 0) { }
break;
}
data_cb(buffer.data(), buffer_actual_size);
} while (buffer_actual_size == buffer.size() && !data_file.eof());
} else if (data_comp == Compression::ZSTD) {
std::vector<uint8_t> in_buffer(ZSTD_DStreamInSize());
std::vector<uint8_t> out_buffer(ZSTD_DStreamOutSize());
std::unique_ptr<ZSTD_DCtx, decltype(&ZSTD_freeDCtx)> dctx{ZSTD_createDCtx(), &ZSTD_freeDCtx};
uint64_t buffer_actual_size {0};
do {
data_file.read(reinterpret_cast<char*>(in_buffer.data()), in_buffer.size());
buffer_actual_size = data_file.gcount();
if (buffer_actual_size == 0) {
break;
}
ZSTD_inBuffer input {in_buffer.data(), buffer_actual_size, 0 };
do {
ZSTD_outBuffer output = { out_buffer.data(), out_buffer.size(), 0 };
size_t const ret = ZSTD_decompressStream(dctx.get(), &output , &input);
if (ZSTD_isError(ret)) {
// error <.<
std::cerr << "FS error: decompression error\n";
break;
}
data_cb(out_buffer.data(), output.pos);
} while (input.pos < input.size);
} while (buffer_actual_size == in_buffer.size() && !data_file.eof());
} else { } else {
assert(false && "implement me"); assert(data_comp == Compression::NONE);
} }
//if (data_comp == Compression::NONE) {
static constexpr int64_t chunk_size {1024 * 1024};
do {
auto data_var = data_file_stack.top()->read(chunk_size);
ByteSpan data;
if (std::holds_alternative<std::vector<uint8_t>>(data_var)) {
auto& vec = std::get<std::vector<uint8_t>>(data_var);
data = {vec.data(), vec.size()};
} else if (std::holds_alternative<ByteSpan>(data_var)) {
data = std::get<ByteSpan>(data_var);
} else {
assert(false);
}
if (data.empty()) {
// error or probably eof
break;
}
data_cb(data);
if (data.size < chunk_size) {
// eof
break;
}
} while (data_file_stack.top()->isGood());
//} else if (data_comp == Compression::ZSTD) {
//std::vector<uint8_t> in_buffer(ZSTD_DStreamInSize());
//std::vector<uint8_t> out_buffer(ZSTD_DStreamOutSize());
//std::unique_ptr<ZSTD_DCtx, decltype(&ZSTD_freeDCtx)> dctx{ZSTD_createDCtx(), &ZSTD_freeDCtx};
//uint64_t buffer_actual_size {0};
//do {
//data_file.read(reinterpret_cast<char*>(in_buffer.data()), in_buffer.size());
//buffer_actual_size = data_file.gcount();
//if (buffer_actual_size == 0) {
//break;
//}
//ZSTD_inBuffer input {in_buffer.data(), buffer_actual_size, 0 };
//do {
//ZSTD_outBuffer output = { out_buffer.data(), out_buffer.size(), 0 };
//size_t const ret = ZSTD_decompressStream(dctx.get(), &output , &input);
//if (ZSTD_isError(ret)) {
//// error <.<
//std::cerr << "FS error: decompression error\n";
//break;
//}
//data_cb(out_buffer.data(), output.pos);
//} while (input.pos < input.size);
//} while (buffer_actual_size == in_buffer.size() && !data_file.eof());
//} else {
//assert(false && "implement me");
//}
return true; return true;
} }
nlohmann::json FragmentStore::loadFromStorageNJ(FragmentID fid) { nlohmann::json FragmentStore::loadFromStorageNJ(FragmentID fid) {
std::vector<uint8_t> tmp_buffer; std::vector<uint8_t> tmp_buffer;
std::function<read_from_storage_put_data_cb> cb = [&tmp_buffer](const uint8_t* buffer, const uint64_t buffer_size) { std::function<read_from_storage_put_data_cb> cb = [&tmp_buffer](const ByteSpan buffer) {
tmp_buffer.insert(tmp_buffer.end(), buffer, buffer+buffer_size); tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend());
}; };
if (!loadFromStorage(fid, cb)) { if (!loadFromStorage(fid, cb)) {

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <solanaceae/util/span.hpp>
#include "./fragment_store_i.hpp" #include "./fragment_store_i.hpp"
#include "./types.hpp" #include "./types.hpp"
@ -80,7 +82,7 @@ struct FragmentStore : public FragmentStoreI {
bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size); bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size);
// ========== load fragment data from storage ========== // ========== load fragment data from storage ==========
using read_from_storage_put_data_cb = void(const uint8_t* buffer, const uint64_t buffer_size); using read_from_storage_put_data_cb = void(const ByteSpan buffer);
bool loadFromStorage(FragmentID fid, std::function<read_from_storage_put_data_cb>& data_cb); bool loadFromStorage(FragmentID fid, std::function<read_from_storage_put_data_cb>& data_cb);
// convenience function // convenience function
nlohmann::json loadFromStorageNJ(FragmentID fid); nlohmann::json loadFromStorageNJ(FragmentID fid);

View File

@ -50,7 +50,7 @@ int main(void) {
{ {
auto frag1h = fs.fragmentHandle(frag1); auto frag1h = fs.fragmentHandle(frag1);
frag1h.emplace_or_replace<FragComp::DataCompressionType>(); frag1h.emplace_or_replace<FragComp::DataCompressionType>().comp = Compression::ZSTD;
frag1h.emplace_or_replace<FragComp::DataEncryptionType>(); frag1h.emplace_or_replace<FragComp::DataEncryptionType>();
std::function<FragmentStore::write_to_storage_fetch_data_cb> fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t { std::function<FragmentStore::write_to_storage_fetch_data_cb> fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t {

View File

@ -9,6 +9,7 @@ struct UUIDGeneratorI {
virtual std::vector<uint8_t> operator()(void) = 0; virtual std::vector<uint8_t> operator()(void) = 0;
}; };
// TODO: templates?
struct UUIDGenerator_128_128 final : public UUIDGeneratorI { struct UUIDGenerator_128_128 final : public UUIDGeneratorI {
private: private:
std::array<uint8_t, 16> _uuid_namespace; std::array<uint8_t, 16> _uuid_namespace;