diff --git a/src/fragment_store/README.md b/src/fragment_store/README.md index a1d18a82..0eba57b3 100644 --- a/src/fragment_store/README.md +++ b/src/fragment_store/README.md @@ -40,25 +40,26 @@ A Metadata json object can have arbitrary keys, some are predefined: ### Split Metadata -file magic bytes `SOLMET` (6 bytes) +msgpack array: -1 byte encryption type (`0x00` is none) - -1 byte compression type (`0x00` is none) - -...metadata here... +- `[0]`: file magic string `SOLMET` (6 bytes) +- `[1]`: uint8 encryption type (`0x00` is none) +- `[2]`: uint8 compression type (`0x00` is none, `0x01` is zstd) +- `[3]`: binary metadata (optionally compressed and encrypted) note that the encryption and compression are for the metadata only. The metadata itself contains encryption and compression info about the data. ### Split Data -(none) all the data is in the metadata file. +All the metadata is in the metadata file. (like encryption and compression) This is mostly to allow direct storage for files in the Fragment store without excessive duplication. Keep in mind to not use the actual file name as the data/meta file name. ### Single fragment +Note: this format is unused for now + file magic bytes `SOLFIL` (6 bytes) 1 byte encryption type (`0x00` is none) diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index 6b4b40eb..a32573ac 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -224,15 +224,6 @@ FragmentID FragmentStore::getFragmentCustomMatcher( return entt::null; } -template -static void writeBinaryMetafileHeader(F& file, const Encryption enc, const Compression comp) { - file.write("SOLMET", 6); - file.put(static_cast>(enc)); - - // TODO: is compressiontype encrypted? - file.put(static_cast>(comp)); -} - bool FragmentStore::syncToStorage(FragmentID fid, std::function& data_cb) { if (!_reg.valid(fid)) { return false; @@ -298,13 +289,8 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::functionsecond(storage.value(fid), meta_data[storage.type().hash()]); //} else if (meta_type == MetaFileType::TEXT_JSON) { - s_cb_it->second({_reg, fid}, meta_data[storage.type().name()]); + s_cb_it->second({_reg, fid}, meta_data_j[storage.type().name()]); //} } if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file - const auto res = nlohmann::json::to_msgpack(meta_data); + const std::vector meta_data = nlohmann::json::to_msgpack(meta_data_j); + std::vector meta_data_compressed; // empty if none + //std::vector meta_data_encrypted; // empty if none - // TODO: refactor - if (meta_comp == Compression::NONE) { - meta_file.write(reinterpret_cast(res.data()), res.size()); - } else if (meta_comp == Compression::ZSTD) { - std::vector compressed_buffer; - compressed_buffer.resize(ZSTD_compressBound(res.size())); + if (meta_comp == Compression::ZSTD) { + meta_data_compressed.resize(ZSTD_compressBound(meta_data.size())); - size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), res.data(), res.size(), 0); // 0 is default is probably 3 + size_t const cSize = ZSTD_compress(meta_data_compressed.data(), meta_data_compressed.size(), meta_data.data(), meta_data.size(), 0); // 0 is default is probably 3 if (ZSTD_isError(cSize)) { std::cerr << "FS error: compressing meta failed\n"; - return false; // HACK + meta_data_compressed.clear(); + meta_comp = Compression::NONE; + } else { + meta_data_compressed.resize(cSize); } - - compressed_buffer.resize(cSize); // maybe skip this resize - - meta_file.write(reinterpret_cast(compressed_buffer.data()), compressed_buffer.size()); + } else if (meta_comp == Compression::NONE) { + // do nothing + } else { + assert(false && "implement me"); } + + // TODO: encryption + + // the meta file is itself msgpack data + nlohmann::json meta_header_j = nlohmann::json::array(); + meta_header_j.emplace_back() = "SOLMET"; + meta_header_j.push_back(meta_enc); + meta_header_j.push_back(meta_comp); + + if (false) { // TODO: encryption + } else if (!meta_data_compressed.empty()) { + meta_header_j.push_back(nlohmann::json::binary(meta_data_compressed)); + } else { + meta_header_j.push_back(nlohmann::json::binary(meta_data)); + } + + const auto meta_header_data = nlohmann::json::to_msgpack(meta_header_j); + meta_file.write(reinterpret_cast(meta_header_data.data()), meta_header_data.size()); } else if (meta_type == MetaFileType::TEXT_JSON) { // cant be compressed or encrypted - meta_file << meta_data.dump(2, ' ', true); + meta_file << meta_data_j.dump(2, ' ', true); } // now data @@ -409,6 +414,8 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function> j; + // file is a msgpack within a msgpack - if (!j.is_object()) { - std::cerr << "FS error: json in meta is broken " << it.id_str << "\n"; + std::vector full_meta_data; + { // read meta file + // figure out size + file.seekg(0, file.end); + uint64_t file_size = file.tellg(); + file.seekg(0, file.beg); + + full_meta_data.resize(file_size); + + file.read(reinterpret_cast(full_meta_data.data()), full_meta_data.size()); + } + + const auto meta_header_j = nlohmann::json::from_msgpack(full_meta_data); + + if (!meta_header_j.is_array() || meta_header_j.size() < 4) { + std::cerr << "FS error: broken binary meta " << it.frag_path << "\n"; continue; } - // TODO: existing fragment file - //newFragmentFile(); - FragmentHandle fh{_reg, _reg.create()}; - fh.emplace(hex2bin(it.id_str)); - - fh.emplace(it.frag_path.generic_u8string()); - - for (const auto& [k, v] : j.items()) { - // type id from string hash - const auto type_id = entt::hashed_string(k.data(), k.size()); - const auto deserl_fn_it = _sc._deserl_json.find(type_id); - if (deserl_fn_it != _sc._deserl_json.cend()) { - // TODO: check return value - deserl_fn_it->second(fh, v); - } else { - std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n"; - } + if (meta_header_j.at(0) != "SOLMET") { + std::cerr << "FS error: wrong magic '" << meta_header_j.at(0) << "' in meta " << it.frag_path << "\n"; + continue; } - // throw new frag event here - throwEventConstruct(fh); - count++; + + Encryption meta_enc = meta_header_j.at(1); + if (meta_enc != Encryption::NONE) { + std::cerr << "FS error: unknown encryption " << it.frag_path << "\n"; + continue; + } + + Compression meta_comp = meta_header_j.at(2); + if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) { + std::cerr << "FS error: unknown compression " << it.frag_path << "\n"; + continue; + } + + //const auto& meta_data_ref = meta_header_j.at(3).is_binary()?meta_header_j.at(3):meta_header_j.at(3).at("data"); + if (!meta_header_j.at(3).is_binary()) { + std::cerr << "FS error: meta data not binary " << it.frag_path << "\n"; + continue; + } + const nlohmann::json::binary_t& meta_data_ref = meta_header_j.at(3); + + std::vector meta_data_decomp; + if (meta_comp == Compression::NONE) { + // do nothing + } else if (meta_comp == Compression::ZSTD) { + meta_data_decomp.resize(ZSTD_DStreamOutSize()); + ZSTD_DCtx* const dctx = ZSTD_createDCtx(); + + ZSTD_inBuffer input {meta_data_ref.data(), meta_data_ref.size(), 0 }; + ZSTD_outBuffer output = { meta_data_decomp.data(), meta_data_decomp.size(), 0 }; + do { + size_t const ret = ZSTD_decompressStream(dctx, &output , &input); + if (ZSTD_isError(ret)) { + // error <.< + std::cerr << "FS error: decompression error\n"; + meta_data_decomp.clear(); + break; + } + } while (input.pos < input.size); + meta_data_decomp.resize(output.pos); + + ZSTD_freeDCtx(dctx); + } else { + assert(false && "implement me"); + } + + // TODO: enc + + if (!meta_data_decomp.empty()) { + j = nlohmann::json::from_msgpack(meta_data_decomp); + } else { + j = nlohmann::json::from_msgpack(meta_data_ref); + } + } else if (it.meta_ext == ".meta.json") { + std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); + if (!file.is_open()) { + std::cerr << "FS error: failed opening meta " << it.frag_path << "\n"; + continue; + } + + file >> j; } else { assert(false); } + + if (!j.is_object()) { + std::cerr << "FS error: json in meta is broken " << it.id_str << "\n"; + continue; + } + + // TODO: existing fragment file + //newFragmentFile(); + FragmentHandle fh{_reg, _reg.create()}; + fh.emplace(hex2bin(it.id_str)); + + fh.emplace(it.frag_path.generic_u8string()); + + for (const auto& [k, v] : j.items()) { + // type id from string hash + const auto type_id = entt::hashed_string(k.data(), k.size()); + const auto deserl_fn_it = _sc._deserl_json.find(type_id); + if (deserl_fn_it != _sc._deserl_json.cend()) { + // TODO: check return value + deserl_fn_it->second(fh, v); + } else { + std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n"; + } + } + // throw new frag event here + throwEventConstruct(fh); + count++; } return count; diff --git a/src/fragment_store/fs_binary_msgpack1.png b/src/fragment_store/fs_binary_msgpack1.png new file mode 100644 index 00000000..b536b203 Binary files /dev/null and b/src/fragment_store/fs_binary_msgpack1.png differ diff --git a/src/fragment_store/fs_binary_msgpack2.png b/src/fragment_store/fs_binary_msgpack2.png new file mode 100644 index 00000000..cd7a9a53 Binary files /dev/null and b/src/fragment_store/fs_binary_msgpack2.png differ diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index b0b42130..a39fc7d1 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -145,7 +145,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // if its still not found, we need a new fragment if (fragment_uid.empty()) { - const auto new_fid = _fs.newFragmentFile("test_message_store/", MetaFileType::TEXT_JSON); + const auto new_fid = _fs.newFragmentFile("test_message_store/", MetaFileType::BINARY_MSGPACK); auto fh = _fs.fragmentHandle(new_fid); if (!static_cast(fh)) { std::cout << "MFS error: failed to create new fragment for message\n"; @@ -154,6 +154,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { fragment_uid = fh.get().v; + fh.emplace_or_replace().comp = Compression::ZSTD; fh.emplace_or_replace().comp = Compression::ZSTD; auto& new_ts_range = fh.emplace(); diff --git a/src/fragment_store/types.hpp b/src/fragment_store/types.hpp index c259bd9b..97cfb144 100644 --- a/src/fragment_store/types.hpp +++ b/src/fragment_store/types.hpp @@ -8,10 +8,12 @@ enum class Encryption : uint8_t { enum class Compression : uint8_t { NONE = 0x00, ZSTD = 0x01, + // TODO: zstd without magic + // TODO: zstd meta dict + // TODO: zstd data(message) dict }; enum class MetaFileType : uint8_t { TEXT_JSON, - //BINARY_ARB, - BINARY_MSGPACK, + BINARY_MSGPACK, // msgpacked msgpack };