From 3cede91aa01d84d2f0bf779fc2e2c2518d3d3c2c Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 9 Apr 2024 19:36:35 +0200 Subject: [PATCH] more work on backend and moving frags to objs --- src/CMakeLists.txt | 9 + .../backends/filesystem_storage.cpp | 377 ++++++++++++++++-- .../backends/filesystem_storage.hpp | 16 +- src/fragment_store/convert_frag_to_obj.cpp | 36 ++ src/fragment_store/fragment_store.cpp | 44 +- src/fragment_store/fragment_store.hpp | 4 - src/fragment_store/message_fragment_store.cpp | 88 ++-- src/fragment_store/message_fragment_store.hpp | 8 +- src/fragment_store/meta_components.hpp | 42 +- src/fragment_store/meta_components_id.inl | 4 + src/fragment_store/object_store.cpp | 36 -- src/fragment_store/object_store.hpp | 2 - 12 files changed, 514 insertions(+), 152 deletions(-) create mode 100644 src/fragment_store/convert_frag_to_obj.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 76b371e6..09d10aa7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -80,6 +80,15 @@ target_link_libraries(test_fragment_store PUBLIC ######################################## +add_executable(convert_frag_to_obj + fragment_store/convert_frag_to_obj.cpp +) + +target_link_libraries(convert_frag_to_obj PUBLIC + fragment_store +) + +######################################## add_executable(tomato ./main.cpp ./icon.rc diff --git a/src/fragment_store/backends/filesystem_storage.cpp b/src/fragment_store/backends/filesystem_storage.cpp index 895cad98..8b8685bb 100644 --- a/src/fragment_store/backends/filesystem_storage.cpp +++ b/src/fragment_store/backends/filesystem_storage.cpp @@ -2,6 +2,9 @@ #include "../meta_components.hpp" +#include + +#include #include #include @@ -22,9 +25,65 @@ static const char* metaFileTypeSuffix(MetaFileType mft) { return ""; // .unk? } +// TODO: move to ... somewhere. (span? file2i?) +static ByteSpan spanFromRead(const std::variant>& data_var) { + if (std::holds_alternative>(data_var)) { + auto& vec = std::get>(data_var); + return {vec.data(), vec.size()}; + } else if (std::holds_alternative(data_var)) { + return std::get(data_var); + } else { + assert(false); + return {}; + } +} + +// TODO: move somewhere else +static bool serl_json_data_enc_type(const ObjectHandle oh, nlohmann::json& out) { + out = static_cast>( + oh.get().enc + ); + return true; +} + +static bool deserl_json_data_enc_type(ObjectHandle oh, const nlohmann::json& in) { + oh.emplace_or_replace( + static_cast( + static_cast>(in) + ) + ); + return true; +} + +static bool serl_json_data_comp_type(const ObjectHandle oh, nlohmann::json& out) { + out = static_cast>( + oh.get().comp + ); + return true; +} + +static bool deserl_json_data_comp_type(ObjectHandle oh, const nlohmann::json& in) { + oh.emplace_or_replace( + static_cast( + static_cast>(in) + ) + ); + return true; +} + namespace backend { -FilesystemStorage::FilesystemStorage(ObjectStore2& os) : StorageBackendI::StorageBackendI(os) { +FilesystemStorage::FilesystemStorage(ObjectStore2& os, std::string_view storage_path) : StorageBackendI::StorageBackendI(os), _storage_path(storage_path) { + _sc.registerSerializerJson(serl_json_data_enc_type); + _sc.registerDeSerializerJson(deserl_json_data_enc_type); + _sc.registerSerializerJson(serl_json_data_comp_type); + _sc.registerDeSerializerJson(deserl_json_data_comp_type); + + // old stuff + _sc.registerSerializerJson(serl_json_data_enc_type); + _sc.registerDeSerializerJson(deserl_json_data_enc_type); + _sc.registerSerializerJson(serl_json_data_comp_type); + _sc.registerDeSerializerJson(deserl_json_data_comp_type); } FilesystemStorage::~FilesystemStorage(void) { @@ -39,38 +98,38 @@ bool FilesystemStorage::write(Object o, std::function()) { + if (!oh.all_of()) { // not a file fragment? return false; } - // split object storage + // split object storage (meta and data are 2 files) - MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults - if (reg.all_of(o)) { - meta_type = oh.get().type; + MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults? + if (reg.all_of(o)) { + meta_type = oh.get().type; } - Encryption meta_enc = Encryption::NONE; // TODO: better defaults - Compression meta_comp = Compression::NONE; // TODO: better defaults + Encryption meta_enc = Encryption::NONE; // TODO: better defaults? + Compression meta_comp = Compression::NONE; // TODO: better defaults? if (meta_type != MetaFileType::TEXT_JSON) { - if (oh.all_of()) { - meta_enc = oh.get().enc; + if (oh.all_of()) { + meta_enc = oh.get().enc; } - if (oh.all_of()) { - meta_comp = oh.get().comp; + if (oh.all_of()) { + meta_comp = oh.get().comp; } } else { // we cant have encryption or compression // so we force NONE for TEXT JSON - oh.emplace_or_replace(Encryption::NONE); - oh.emplace_or_replace(Compression::NONE); + oh.emplace_or_replace(Encryption::NONE); + oh.emplace_or_replace(Compression::NONE); } - std::filesystem::path meta_tmp_path = oh.get().path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp"; + std::filesystem::path meta_tmp_path = oh.get().path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp"; meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string()); // TODO: make meta comp work with mem compressor //auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp); @@ -85,14 +144,14 @@ bool FilesystemStorage::write(Object o, std::function()) { - data_enc = oh.get().enc; + if (oh.all_of()) { + data_enc = oh.get().enc; } - if (oh.all_of()) { - data_comp = oh.get().comp; + if (oh.all_of()) { + data_comp = oh.get().comp; } - std::filesystem::path data_tmp_path = oh.get().path + ".tmp"; + std::filesystem::path data_tmp_path = oh.get().path + ".tmp"; data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string()); auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp); if (data_file_stack.empty()) { @@ -107,6 +166,7 @@ bool FilesystemStorage::write(Object o, std::function().path << "'\n"; + std::cerr << "FS error: binary writer creation failed '" << oh.get().path << "'\n"; return false; } @@ -152,7 +212,7 @@ bool FilesystemStorage::write(Object o, std::function().path << "'\n"; + std::cerr << "FS error: binary writer failed '" << oh.get().path << "'\n"; return false; } } @@ -203,27 +263,24 @@ bool FilesystemStorage::write(Object o, std::functionwrite({buffer.data(), buffer_actual_size}); } while (buffer_actual_size == buffer.size()); - //meta_file.flush(); - //meta_file.close(); + // flush // TODO: use scope while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? - //data_file.flush(); - //data_file.close(); while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? std::filesystem::rename( meta_tmp_path, - oh.get().path + ".meta" + metaFileTypeSuffix(meta_type) + oh.get().path + ".meta" + metaFileTypeSuffix(meta_type) ); std::filesystem::rename( data_tmp_path, - oh.get().path + oh.get().path ); // TODO: check return value of renames - if (oh.all_of()) { - oh.remove(); + if (oh.all_of()) { + oh.remove(); } return true; @@ -233,5 +290,261 @@ bool FilesystemStorage::read(Object o, std::function file_obj_list; + + std::filesystem::path storage_path{path}; + + auto handle_file = [&](const std::filesystem::path& file_path) { + if (!std::filesystem::is_regular_file(file_path)) { + return; + } + // handle file + + if (file_path.has_extension()) { + // skip over metadata, assuming only metafiles have extentions (might be wrong?) + // also skips temps + return; + } + + auto relative_path = std::filesystem::proximate(file_path, storage_path); + std::string id_str = relative_path.generic_u8string(); + // delete all '/' + id_str.erase(std::remove(id_str.begin(), id_str.end(), '/'), id_str.end()); + if (id_str.size() % 2 != 0) { + std::cerr << "FS error: non hex object uid detected: '" << id_str << "'\n"; + } + + if (file_obj_list.contains(ObjFileEntry{id_str, {}, ""})) { + std::cerr << "FS error: object duplicate detected: '" << id_str << "'\n"; + return; // skip + } + + const char* meta_ext = ".meta.msgpack"; + { // find meta + // TODO: this as to know all possible extentions + bool has_meta_msgpack = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.msgpack"); + bool has_meta_json = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.json"); + const size_t meta_sum = + (has_meta_msgpack?1:0) + + (has_meta_json?1:0) + ; + + if (meta_sum > 1) { // has multiple + std::cerr << "FS error: object with multiple meta files detected: " << id_str << "\n"; + return; // skip + } + + if (meta_sum == 0) { + std::cerr << "FS error: object missing meta file detected: " << id_str << "\n"; + return; // skip + } + + if (has_meta_json) { + meta_ext = ".meta.json"; + } + } + + file_obj_list.emplace(ObjFileEntry{ + std::move(id_str), + file_path, + meta_ext + }); + }; + + for (const auto& outer_path : std::filesystem::directory_iterator(storage_path)) { + if (std::filesystem::is_regular_file(outer_path)) { + handle_file(outer_path); + } else if (std::filesystem::is_directory(outer_path)) { + // subdir, part of id + for (const auto& inner_path : std::filesystem::directory_iterator(outer_path)) { + //if (std::filesystem::is_regular_file(inner_path)) { + + //// handle file + //} // TODO: support deeper recursion? + handle_file(inner_path); + } + } + } + + std::cout << "FS: scan found:\n"; + for (const auto& it : file_obj_list) { + std::cout << " " << it.id_str << "\n"; + } + + // step 2: check if files preexist in reg + // main thread + // (merge into step 3 and do more error checking?) + // TODO: properly handle duplicates, dups form different backends might be legit + // important +#if 0 + for (auto it = file_obj_list.begin(); it != file_obj_list.end();) { + auto id = hex2bin(it->id_str); + auto fid = getFragmentByID(id); + if (_reg.valid(fid)) { + // pre exising (handle differently??) + // check if store differs? + it = file_obj_list.erase(it); + } else { + it++; + } + } +#endif + + std::vector scanned_objs; + // step 3: parse meta and insert into reg of non preexising + // main thread + // TODO: check timestamps of preexisting and reload? mark external/remote dirty? + for (const auto& it : file_obj_list) { + nlohmann::json j; + if (it.meta_ext == ".meta.msgpack") { + std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); + if (!file.is_open()) { + std::cout << "FS error: failed opening meta " << it.obj_path << "\n"; + continue; + } + + // file is a msgpack within a msgpack + + std::vector full_meta_data; + { // read meta file + // figure out size + file.seekg(0, file.end); + uint64_t file_size = file.tellg(); + file.seekg(0, file.beg); + + full_meta_data.resize(file_size); + + file.read(reinterpret_cast(full_meta_data.data()), full_meta_data.size()); + } + + const auto meta_header_j = nlohmann::json::from_msgpack(full_meta_data, true, false); + + if (!meta_header_j.is_array() || meta_header_j.size() < 4) { + std::cerr << "FS error: broken binary meta " << it.obj_path << "\n"; + continue; + } + + if (meta_header_j.at(0) != "SOLMET") { + std::cerr << "FS error: wrong magic '" << meta_header_j.at(0) << "' in meta " << it.obj_path << "\n"; + continue; + } + + Encryption meta_enc = meta_header_j.at(1); + if (meta_enc != Encryption::NONE) { + std::cerr << "FS error: unknown encryption " << it.obj_path << "\n"; + continue; + } + + Compression meta_comp = meta_header_j.at(2); + if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) { + std::cerr << "FS error: unknown compression " << it.obj_path << "\n"; + continue; + } + + //const auto& meta_data_ref = meta_header_j.at(3).is_binary()?meta_header_j.at(3):meta_header_j.at(3).at("data"); + if (!meta_header_j.at(3).is_binary()) { + std::cerr << "FS error: meta data not binary " << it.obj_path << "\n"; + continue; + } + const nlohmann::json::binary_t& meta_data_ref = meta_header_j.at(3); + + std::stack> binary_reader_stack; + binary_reader_stack.push(std::make_unique(ByteSpan{meta_data_ref.data(), meta_data_ref.size()})); + + if (!buildStackRead(binary_reader_stack, meta_enc, meta_comp)) { + std::cerr << "FS error: binary reader creation failed " << it.obj_path << "\n"; + continue; + } + + // HACK: read fixed amout of data, but this way if we have neither enc nor comp we pass the span through + auto binary_read_value = binary_reader_stack.top()->read(10*1024*1024); // is 10MiB large enough for meta? + const auto binary_read_span = spanFromRead(binary_read_value); + assert(binary_read_span.size < 10*1024*1024); + + j = nlohmann::json::from_msgpack(binary_read_span, true, false); + } else if (it.meta_ext == ".meta.json") { + std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); + if (!file.is_open()) { + std::cerr << "FS error: failed opening meta " << it.obj_path << "\n"; + continue; + } + + file >> j; + } else { + assert(false); + } + + if (!j.is_object()) { + std::cerr << "FS error: json in meta is broken " << it.id_str << "\n"; + continue; + } + + // TODO: existing fragment file + //newFragmentFile(); + ObjectHandle oh{_os.registry(), _os.registry().create()}; + oh.emplace(this); + oh.emplace(hex2bin(it.id_str)); + + oh.emplace(it.obj_path.generic_u8string()); + + for (const auto& [k, v] : j.items()) { + // type id from string hash + const auto type_id = entt::hashed_string(k.data(), k.size()); + const auto deserl_fn_it = _sc._deserl_json.find(type_id); + if (deserl_fn_it != _sc._deserl_json.cend()) { + // TODO: check return value + deserl_fn_it->second(oh, v); + } else { + std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n"; + } + } + scanned_objs.push_back(oh); + } + + // TODO: mutex and move code to async and return this list ? + + // throw new frag event here, after loading them all + for (const Object o : scanned_objs) { + _os.throwEventConstruct(o); + } + + return scanned_objs.size(); +} + +void FilesystemStorage::scanPathAsync(std::string path) { + // add path to queue + // HACK: if path is known/any fragment is in the path, this operation blocks (non async) + scanPath(path); // TODO: make async and post result +} + } // backend diff --git a/src/fragment_store/backends/filesystem_storage.hpp b/src/fragment_store/backends/filesystem_storage.hpp index 715bf91d..dc0c8031 100644 --- a/src/fragment_store/backends/filesystem_storage.hpp +++ b/src/fragment_store/backends/filesystem_storage.hpp @@ -5,14 +5,28 @@ namespace backend { struct FilesystemStorage : public StorageBackendI { - FilesystemStorage(ObjectStore2& os); + FilesystemStorage(ObjectStore2& os, std::string_view storage_path = "test_obj_store"); ~FilesystemStorage(void); + // TODO: fix the path for this specific fs? + std::string _storage_path; + bool write(Object o, std::function& data_cb) override; bool read(Object o, std::function& data_cb) override; //// convenience function //nlohmann::json loadFromStorageNJ(FragmentID fid); + + void scanAsync(void); + + private: + size_t scanPath(std::string_view path); + void scanPathAsync(std::string path); + + private: + // this thing needs to change and be facilitated over the OS + // but the json serializer are specific to the backend + SerializerCallbacks _sc; }; } // backend diff --git a/src/fragment_store/convert_frag_to_obj.cpp b/src/fragment_store/convert_frag_to_obj.cpp new file mode 100644 index 00000000..772d7bfa --- /dev/null +++ b/src/fragment_store/convert_frag_to_obj.cpp @@ -0,0 +1,36 @@ +#include "./object_store.hpp" +#include "./backends/filesystem_storage.hpp" + +#include +#include + +int main(int argc, const char** argv) { + if (argc != 3) { + std::cerr << "wrong paramter count, do " << argv[0] << " \n"; + return 1; + } + + if (!std::filesystem::is_directory(argv[1])) { + std::cerr << "input folder is no folder\n"; + } + + std::filesystem::create_directories(argv[2]); + + // we are going to use 2 different OS for convineance, but could be done with 1 too + ObjectStore2 os_src; + ObjectStore2 os_dst; + + backend::FilesystemStorage fsb_src(os_src, argv[1]); + backend::FilesystemStorage fsb_dst(os_dst, argv[2]); + + // add message fragment store too (adds meta?) + + // hookup events + // perform scan (which triggers events) + fsb_dst.scanAsync(); // fill with existing? + fsb_src.scanAsync(); // the scan + + // done + return 0; +} + diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index fc2b0e36..2a349301 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -139,9 +139,9 @@ FragmentID FragmentStore::newFragmentFile( _reg.emplace(new_frag, id); // file (info) comp - _reg.emplace(new_frag, fragment_file_path.generic_u8string()); + _reg.emplace(new_frag, fragment_file_path.generic_u8string()); - _reg.emplace(new_frag, mft); + _reg.emplace(new_frag, mft); // meta needs to be synced to file std::function empty_data_cb = [](auto*, auto) -> uint64_t { return 0; }; @@ -188,7 +188,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid)) { + if (!_reg.all_of(fid)) { // not a file fragment? return false; } @@ -196,30 +196,30 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid)) { - meta_type = _reg.get(fid).type; + if (_reg.all_of(fid)) { + meta_type = _reg.get(fid).type; } Encryption meta_enc = Encryption::NONE; // TODO: better defaults Compression meta_comp = Compression::NONE; // TODO: better defaults if (meta_type != MetaFileType::TEXT_JSON) { - if (_reg.all_of(fid)) { - meta_enc = _reg.get(fid).enc; + if (_reg.all_of(fid)) { + meta_enc = _reg.get(fid).enc; } - if (_reg.all_of(fid)) { - meta_comp = _reg.get(fid).comp; + if (_reg.all_of(fid)) { + meta_comp = _reg.get(fid).comp; } } else { // we cant have encryption or compression // so we force NONE for TEXT JSON - _reg.emplace_or_replace(fid, Encryption::NONE); - _reg.emplace_or_replace(fid, Compression::NONE); + _reg.emplace_or_replace(fid, Encryption::NONE); + _reg.emplace_or_replace(fid, Compression::NONE); } - std::filesystem::path meta_tmp_path = _reg.get(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp"; + std::filesystem::path meta_tmp_path = _reg.get(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp"; meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string()); // TODO: make meta comp work with mem compressor //auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp); @@ -241,7 +241,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid).comp; } - std::filesystem::path data_tmp_path = _reg.get(fid).path + ".tmp"; + std::filesystem::path data_tmp_path = _reg.get(fid).path + ".tmp"; data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string()); auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp); if (data_file_stack.empty()) { @@ -289,7 +289,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid).path << "'\n"; + std::cerr << "FS error: binary writer creation failed '" << _reg.get(fid).path << "'\n"; return false; } @@ -301,7 +301,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid).path << "'\n"; + std::cerr << "FS error: binary writer failed '" << _reg.get(fid).path << "'\n"; return false; } } @@ -361,18 +361,18 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + _reg.get(fid).path + ".meta" + metaFileTypeSuffix(meta_type) ); std::filesystem::rename( data_tmp_path, - _reg.get(fid).path + _reg.get(fid).path ); // TODO: check return value of renames - if (_reg.all_of(fid)) { - _reg.remove(fid); + if (_reg.all_of(fid)) { + _reg.remove(fid); } return true; @@ -396,13 +396,13 @@ bool FragmentStore::loadFromStorage(FragmentID fid, std::function(fid)) { + if (!_reg.all_of(fid)) { // not a file fragment? // TODO: memory fragments return false; } - const auto& frag_path = _reg.get(fid).path; + const auto& frag_path = _reg.get(fid).path; // TODO: check if metadata dirty? // TODO: what if file changed on disk? @@ -672,7 +672,7 @@ size_t FragmentStore::scanStoragePath(std::string_view path) { FragmentHandle fh{_reg, _reg.create()}; fh.emplace(hex2bin(it.id_str)); - fh.emplace(it.frag_path.generic_u8string()); + fh.emplace(it.frag_path.generic_u8string()); for (const auto& [k, v] : j.items()) { // type id from string hash diff --git a/src/fragment_store/fragment_store.hpp b/src/fragment_store/fragment_store.hpp index d29bff60..5982f70e 100644 --- a/src/fragment_store/fragment_store.hpp +++ b/src/fragment_store/fragment_store.hpp @@ -94,9 +94,5 @@ struct FragmentStore : public FragmentStoreI { private: void registerSerializers(void); // internal comps - // internal actual backends - // TODO: seperate out - bool syncToMemory(FragmentID fid, std::function& data_cb); - bool syncToFile(FragmentID fid, std::function& data_cb); }; diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index a189972d..ce0b1ebe 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -68,7 +68,7 @@ namespace Message::Components { } // Message::Components -namespace Fragment::Components { +namespace ObjectStore::Components { NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesTSRange, begin, end) NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesContact, id) @@ -82,7 +82,7 @@ namespace Fragment::Components { Contact3 e {entt::null}; }; } -} // Fragment::Component +} // ObjectStore::Component void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (_fs_ignore_event) { @@ -133,7 +133,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { assert(static_cast(fh)); // assuming ts range exists - auto& fts_comp = fh.get(); + auto& fts_comp = fh.get(); if (fts_comp.begin <= msg_ts && fts_comp.end >= msg_ts) { fragment_id = fid; @@ -149,7 +149,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { assert(static_cast(fh)); // assuming ts range exists - auto& fts_comp = fh.get(); + auto& fts_comp = fh.get(); const int64_t frag_range = int64_t(fts_comp.end) - int64_t(fts_comp.begin); constexpr static int64_t max_frag_ts_extent {1000*60*60}; @@ -205,17 +205,17 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { fragment_id = fh; - fh.emplace_or_replace().comp = Compression::ZSTD; - fh.emplace_or_replace().comp = Compression::ZSTD; + fh.emplace_or_replace().comp = Compression::ZSTD; + fh.emplace_or_replace().comp = Compression::ZSTD; - auto& new_ts_range = fh.emplace_or_replace(); + auto& new_ts_range = fh.emplace_or_replace(); new_ts_range.begin = msg_ts; new_ts_range.end = msg_ts; { const auto msg_reg_contact = m.registry()->ctx().get(); if (_cr.all_of(msg_reg_contact)) { - fh.emplace(_cr.get(msg_reg_contact).data); + fh.emplace(_cr.get(msg_reg_contact).data); } else { // ? rage quit? } @@ -235,7 +235,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { fid_open.emplace(fragment_id); - std::cout << "MFS: created new fragment " << bin2hex(fh.get().v) << "\n"; + std::cout << "MFS: created new fragment " << bin2hex(fh.get().v) << "\n"; _fs_ignore_event = true; _fs.throwEventConstruct(fh); @@ -296,13 +296,13 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh if (!j.is_array()) { // wrong data - fh.emplace_or_replace(); + fh.emplace_or_replace(); return; } if (j.size() == 0) { // empty array - fh.emplace_or_replace(); + fh.emplace_or_replace(); return; } @@ -389,12 +389,12 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh if (messages_new_or_updated == 0) { // useless frag // TODO: unload? - fh.emplace_or_replace(); + fh.emplace_or_replace(); } } bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry& reg) { - auto& ftsrange = fh.get_or_emplace(Message::getTimeMS(), Message::getTimeMS()); + auto& ftsrange = fh.get_or_emplace(Message::getTimeMS(), Message::getTimeMS()); auto j = nlohmann::json::array(); @@ -476,10 +476,16 @@ MessageFragmentStore::MessageFragmentStore( _rmm.subscribe(this, RegistryMessageModel_Event::message_updated); _rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); - _fs._sc.registerSerializerJson(); - _fs._sc.registerDeSerializerJson(); - _fs._sc.registerSerializerJson(); - _fs._sc.registerDeSerializerJson(); + _fs._sc.registerSerializerJson(); + _fs._sc.registerDeSerializerJson(); + _fs._sc.registerSerializerJson(); + _fs._sc.registerDeSerializerJson(); + + // old + _fs._sc.registerSerializerJson(_fs._sc.component_get_json); + _fs._sc.registerDeSerializerJson(_fs._sc.component_emplace_or_replace_json); + _fs._sc.registerSerializerJson(_fs._sc.component_get_json); + _fs._sc.registerDeSerializerJson(_fs._sc.component_emplace_or_replace_json); _fs.subscribe(this, FragmentStore_Event::fragment_construct); _fs.subscribe(this, FragmentStore_Event::fragment_updated); @@ -589,12 +595,12 @@ float MessageFragmentStore::tick(float) { return 0.05f; } - if (!fh.all_of()) { + if (!fh.all_of()) { return 0.05f; } // get ts range of frag and collide with all curser(s/ranges) - const auto& frag_range = fh.get(); + const auto& frag_range = fh.get(); auto* msg_reg = _rmm.get(c); if (msg_reg == nullptr) { @@ -646,19 +652,19 @@ float MessageFragmentStore::tick(float) { return 0.05f; } - if (!fh.all_of()) { + if (!fh.all_of()) { std::cerr << "MFS error: frag has no range\n"; // ???? msg_reg->ctx().get().erase(fid); return 0.05f; } - if (fh.all_of()) { + if (fh.all_of()) { continue; // skip known empty } // get ts range of frag and collide with all curser(s/ranges) - const auto& [range_begin, range_end] = fh.get(); + const auto& [range_begin, range_end] = fh.get(); if (rangeVisible(range_begin, range_end, *msg_reg)) { std::cout << "MFS: frag hit by vis range\n"; @@ -691,7 +697,7 @@ float MessageFragmentStore::tick(float) { cf.sorted_end.crend(), ts_begin_comp.ts, [&](const FragmentID element, const auto& value) -> bool { - return _fs._reg.get(element).end >= value; + return _fs._reg.get(element).end >= value; } ); @@ -709,7 +715,7 @@ float MessageFragmentStore::tick(float) { // only ok bc next is cheap for (size_t i = 0; i < 5 && _fs._reg.valid(next_frag); next_frag = cf.next(next_frag)) { auto fh = _fs.fragmentHandle(next_frag); - if (fh.any_of()) { + if (fh.any_of()) { continue; // skip known empty } @@ -739,7 +745,7 @@ float MessageFragmentStore::tick(float) { cf.sorted_begin.cend(), ts_end, [&](const FragmentID element, const auto& value) -> bool { - return _fs._reg.get(element).begin < value; + return _fs._reg.get(element).begin < value; } ); @@ -757,7 +763,7 @@ float MessageFragmentStore::tick(float) { // only ok bc next is cheap for (size_t i = 0; i < 5 && _fs._reg.valid(prev_frag); prev_frag = cf.prev(prev_frag)) { auto fh = _fs.fragmentHandle(prev_frag); - if (fh.any_of()) { + if (fh.any_of()) { continue; // skip known empty } @@ -806,7 +812,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) return false; // skip self } - if (!e.e.all_of()) { + if (!e.e.all_of()) { return false; // not for us } @@ -814,7 +820,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) Contact3 frag_contact = entt::null; { // get contact - const auto& frag_contact_id = e.e.get().id; + const auto& frag_contact_id = e.e.get().id; // TODO: id lookup table, this is very inefficent for (const auto& [c_it, id_it] : _cr.view().each()) { if (frag_contact_id == id_it.data) { @@ -826,7 +832,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) // unkown contact return false; } - e.e.emplace_or_replace(frag_contact); + e.e.emplace_or_replace(frag_contact); } // create if not exist @@ -853,23 +859,23 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) { return false; // skip self } - if (!e.e.all_of()) { + if (!e.e.all_of()) { return false; // not for us } // since its an update, we might have it associated, or not // its also possible it was tagged as empty - e.e.remove(); + e.e.remove(); Contact3 frag_contact = entt::null; { // get contact // probably cached already - if (e.e.all_of()) { - frag_contact = e.e.get().e; + if (e.e.all_of()) { + frag_contact = e.e.get().e; } if (!_cr.valid(frag_contact)) { - const auto& frag_contact_id = e.e.get().id; + const auto& frag_contact_id = e.e.get().id; // TODO: id lookup table, this is very inefficent for (const auto& [c_it, id_it] : _cr.view().each()) { if (frag_contact_id == id_it.data) { @@ -881,7 +887,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) { // unkown contact return false; } - e.e.emplace_or_replace(frag_contact); + e.e.emplace_or_replace(frag_contact); } } @@ -921,15 +927,15 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) { sorted_begin.cbegin(), sorted_begin.cend(), [frag](const FragmentID a) -> bool { - const auto begin_a = frag.registry()->get(a).begin; - const auto begin_frag = frag.get().begin; + const auto begin_a = frag.registry()->get(a).begin; + const auto begin_frag = frag.get().begin; if (begin_a > begin_frag) { return true; } else if (begin_a < begin_frag) { return false; } else { // equal ts, we need to fall back to id (id can not be equal) - return isLess(frag.get().v, frag.registry()->get(a).v); + return isLess(frag.get().v, frag.registry()->get(a).v); } } ); @@ -946,15 +952,15 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) { sorted_end.cbegin(), sorted_end.cend(), [frag](const FragmentID a) -> bool { - const auto end_a = frag.registry()->get(a).end; - const auto end_frag = frag.get().end; + const auto end_a = frag.registry()->get(a).end; + const auto end_frag = frag.get().end; if (end_a > end_frag) { return true; } else if (end_a < end_frag) { return false; } else { // equal ts, we need to fall back to id (id can not be equal) - return isLess(frag.get().v, frag.registry()->get(a).v); + return isLess(frag.get().v, frag.registry()->get(a).v); } } ); diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index ac3813a5..179ece15 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -54,7 +54,7 @@ namespace Message::Components { } // Message::Components -namespace Fragment::Components { +namespace ObjectStore::Components { struct MessagesTSRange { // timestamp range within the fragment uint64_t begin {0}; // newer msg -> higher number @@ -67,6 +67,12 @@ namespace Fragment::Components { // TODO: add src contact (self id) +} // ObjectStore::Components + +// old +namespace Fragment::Components { + struct MessagesTSRange : public ObjComp::MessagesTSRange {}; + struct MessagesContact : public ObjComp::MessagesContact {}; } // Fragment::Components // handles fragments for messages diff --git a/src/fragment_store/meta_components.hpp b/src/fragment_store/meta_components.hpp index a8ec20e5..f0dcf57a 100644 --- a/src/fragment_store/meta_components.hpp +++ b/src/fragment_store/meta_components.hpp @@ -1,12 +1,13 @@ #pragma once #include "./types.hpp" +#include "object_store.hpp" #include #include #include -namespace Fragment::Components { +namespace ObjectStore::Components { // TODO: is this special and should this be saved to meta or not (its already in the file name on disk) struct ID { @@ -25,18 +26,6 @@ namespace Fragment::Components { // meta that is not written to (meta-)file namespace Ephemeral { - // excluded from file meta - struct FilePath { - // contains store path, if any - std::string path; - }; - - // TODO: seperate into remote and local? - // (remote meaning eg. the file on disk was changed by another program) - struct DirtyTag {}; - - - // type as comp struct MetaFileType { ::MetaFileType type {::MetaFileType::TEXT_JSON}; }; @@ -49,11 +38,38 @@ namespace Fragment::Components { Compression comp {Compression::NONE}; }; + struct Backend { + // TODO: shared_ptr instead?? + StorageBackendI* ptr; + }; + + // excluded from file meta + // TODO: move to backend specific + struct FilePath { + // contains store path, if any + std::string path; + }; + + // TODO: seperate into remote and local? + // (remote meaning eg. the file on disk was changed by another program) + struct DirtyTag {}; + } // Ephemeral } // Components // shortened to save bytes (until I find a way to save by ID in msgpack) +namespace ObjComp = ObjectStore::Components; + +// old names +namespace Fragment::Components { + //struct ID {}; + //struct DataEncryptionType {}; + //struct DataCompressionType {}; + struct ID : public ObjComp::ID {}; + struct DataEncryptionType : public ObjComp::DataEncryptionType {}; + struct DataCompressionType : public ObjComp::DataCompressionType {}; +} namespace FragComp = Fragment::Components; #include "./meta_components_id.inl" diff --git a/src/fragment_store/meta_components_id.inl b/src/fragment_store/meta_components_id.inl index c912cce7..d9f7de37 100644 --- a/src/fragment_store/meta_components_id.inl +++ b/src/fragment_store/meta_components_id.inl @@ -18,6 +18,10 @@ constexpr std::string_view entt::type_name::value() noexcept { \ // cross compiler stable ids +DEFINE_COMP_ID(ObjComp::DataEncryptionType) +DEFINE_COMP_ID(ObjComp::DataCompressionType) + +// old stuff DEFINE_COMP_ID(FragComp::DataEncryptionType) DEFINE_COMP_ID(FragComp::DataCompressionType) diff --git a/src/fragment_store/object_store.cpp b/src/fragment_store/object_store.cpp index 4fc77860..146f6778 100644 --- a/src/fragment_store/object_store.cpp +++ b/src/fragment_store/object_store.cpp @@ -22,43 +22,7 @@ bool StorageBackendI::write(Object o, const ByteSpan data) { return write(o, fn_cb); } -static bool serl_json_data_enc_type(const ObjectHandle oh, nlohmann::json& out) { - out = static_cast>( - oh.get().enc - ); - return true; -} - -static bool deserl_json_data_enc_type(ObjectHandle oh, const nlohmann::json& in) { - oh.emplace_or_replace( - static_cast( - static_cast>(in) - ) - ); - return true; -} - -static bool serl_json_data_comp_type(const ObjectHandle oh, nlohmann::json& out) { - out = static_cast>( - oh.get().comp - ); - return true; -} - -static bool deserl_json_data_comp_type(ObjectHandle oh, const nlohmann::json& in) { - oh.emplace_or_replace( - static_cast( - static_cast>(in) - ) - ); - return true; -} - ObjectStore2::ObjectStore2(void) { - _sc.registerSerializerJson(serl_json_data_enc_type); - _sc.registerDeSerializerJson(deserl_json_data_enc_type); - _sc.registerSerializerJson(serl_json_data_comp_type); - _sc.registerDeSerializerJson(deserl_json_data_comp_type); } ObjectStore2::~ObjectStore2(void) { diff --git a/src/fragment_store/object_store.hpp b/src/fragment_store/object_store.hpp index bcfff21f..3ce56bbd 100644 --- a/src/fragment_store/object_store.hpp +++ b/src/fragment_store/object_store.hpp @@ -75,8 +75,6 @@ struct ObjectStore2 : public ObjectStoreEventProviderI { ObjectRegistry _reg; - SerializerCallbacks _sc; - // TODO: default backend? ObjectStore2(void);