fragment store fully file2 zstd. no more zstd.h in fragment store
This commit is contained in:
parent
268cbe137e
commit
fd0b210bbb
@ -13,8 +13,6 @@
|
|||||||
|
|
||||||
#include "./file2_stack.hpp"
|
#include "./file2_stack.hpp"
|
||||||
|
|
||||||
#include <zstd.h>
|
|
||||||
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
@ -281,44 +279,44 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file
|
if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file
|
||||||
|
std::vector<uint8_t> binary_meta_data;
|
||||||
|
{
|
||||||
|
std::stack<std::unique_ptr<File2I>> binary_writer_stack;
|
||||||
|
binary_writer_stack.push(std::make_unique<File2MemW>(binary_meta_data));
|
||||||
|
|
||||||
|
if (!buildStackWrite(binary_writer_stack, meta_enc, meta_comp)) {
|
||||||
|
while (!meta_file_stack.empty()) { meta_file_stack.pop(); }
|
||||||
|
std::filesystem::remove(meta_tmp_path);
|
||||||
|
while (!data_file_stack.empty()) { data_file_stack.pop(); }
|
||||||
|
std::filesystem::remove(data_tmp_path);
|
||||||
|
std::cerr << "FS error: binary writer creation failed '" << _reg.get<FragComp::Ephemeral::FilePath>(fid).path << "'\n";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
const std::vector<uint8_t> meta_data = nlohmann::json::to_msgpack(meta_data_j);
|
const std::vector<uint8_t> meta_data = nlohmann::json::to_msgpack(meta_data_j);
|
||||||
std::vector<uint8_t> meta_data_compressed; // empty if none
|
if (!binary_writer_stack.top()->write(ByteSpan{meta_data})) {
|
||||||
//std::vector<uint8_t> meta_data_encrypted; // empty if none
|
// i feel like exceptions or refactoring would be nice here
|
||||||
|
while (!meta_file_stack.empty()) { meta_file_stack.pop(); }
|
||||||
if (meta_comp == Compression::ZSTD) {
|
std::filesystem::remove(meta_tmp_path);
|
||||||
meta_data_compressed.resize(ZSTD_compressBound(meta_data.size()));
|
while (!data_file_stack.empty()) { data_file_stack.pop(); }
|
||||||
|
std::filesystem::remove(data_tmp_path);
|
||||||
size_t const cSize = ZSTD_compress(meta_data_compressed.data(), meta_data_compressed.size(), meta_data.data(), meta_data.size(), 0); // 0 is default is probably 3
|
std::cerr << "FS error: binary writer failed '" << _reg.get<FragComp::Ephemeral::FilePath>(fid).path << "'\n";
|
||||||
if (ZSTD_isError(cSize)) {
|
return false;
|
||||||
std::cerr << "FS error: compressing meta failed\n";
|
}
|
||||||
meta_data_compressed.clear();
|
|
||||||
meta_comp = Compression::NONE;
|
|
||||||
} else {
|
|
||||||
meta_data_compressed.resize(cSize);
|
|
||||||
}
|
}
|
||||||
} else if (meta_comp == Compression::NONE) {
|
|
||||||
// do nothing
|
|
||||||
} else {
|
|
||||||
assert(false && "implement me");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: encryption
|
//// the meta file is itself msgpack data
|
||||||
|
|
||||||
// the meta file is itself msgpack data
|
|
||||||
nlohmann::json meta_header_j = nlohmann::json::array();
|
nlohmann::json meta_header_j = nlohmann::json::array();
|
||||||
meta_header_j.emplace_back() = "SOLMET";
|
meta_header_j.emplace_back() = "SOLMET";
|
||||||
meta_header_j.push_back(meta_enc);
|
meta_header_j.push_back(meta_enc);
|
||||||
meta_header_j.push_back(meta_comp);
|
meta_header_j.push_back(meta_comp);
|
||||||
|
|
||||||
if (false) { // TODO: encryption
|
// with a custom msgpack impl like cmp, we can be smarter here and dont need an extra buffer
|
||||||
} else if (!meta_data_compressed.empty()) {
|
meta_header_j.push_back(nlohmann::json::binary(binary_meta_data));
|
||||||
meta_header_j.push_back(nlohmann::json::binary(meta_data_compressed));
|
|
||||||
} else {
|
|
||||||
meta_header_j.push_back(nlohmann::json::binary(meta_data));
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto meta_header_data = nlohmann::json::to_msgpack(meta_header_j);
|
const auto meta_header_data = nlohmann::json::to_msgpack(meta_header_j);
|
||||||
//meta_file.write(reinterpret_cast<const char*>(meta_header_data.data()), meta_header_data.size());
|
|
||||||
meta_file_stack.top()->write({meta_header_data.data(), meta_header_data.size()});
|
meta_file_stack.top()->write({meta_header_data.data(), meta_header_data.size()});
|
||||||
} else if (meta_type == MetaFileType::TEXT_JSON) {
|
} else if (meta_type == MetaFileType::TEXT_JSON) {
|
||||||
// cant be compressed or encrypted
|
// cant be compressed or encrypted
|
||||||
@ -337,6 +335,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
|
|||||||
|
|
||||||
// now data
|
// now data
|
||||||
// for zstd compression, chunk size is frame size. (no cross frame referencing)
|
// for zstd compression, chunk size is frame size. (no cross frame referencing)
|
||||||
|
// TODO: add buffering steam layer
|
||||||
static constexpr int64_t chunk_size{1024*1024}; // 1MiB should be enough
|
static constexpr int64_t chunk_size{1024*1024}; // 1MiB should be enough
|
||||||
std::vector<uint8_t> buffer(chunk_size);
|
std::vector<uint8_t> buffer(chunk_size);
|
||||||
uint64_t buffer_actual_size {0};
|
uint64_t buffer_actual_size {0};
|
||||||
|
Loading…
Reference in New Issue
Block a user