diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index be4c3938..97dd67a5 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -3,6 +3,8 @@ #include #include +#include +#include #include @@ -13,6 +15,7 @@ #include #include #include +#include #include #include @@ -51,7 +54,7 @@ FragmentStore::FragmentStore( registerSerializers(); } -FragmentStore::FragmentHandle FragmentStore::fragmentHandle(FragmentID fid) { +FragmentHandle FragmentStore::fragmentHandle(FragmentID fid) { return {_reg, fid}; } @@ -340,6 +343,184 @@ bool FragmentStore::syncToStorage(FragmentID fid, const uint8_t* data, const uin return syncToStorage(fid, fn_cb); } +size_t FragmentStore::scanStoragePath(std::string_view path) { + if (path.empty()) { + path = _default_store_path; + } + // TODO: extract so async can work (or/and make iteratable generator) + + if (!std::filesystem::is_directory(path)) { + std::cerr << "FS error: scan path not a directory '" << path << "'\n"; + return 0; + } + + // step 1: make snapshot of files, validate metafiles and save id/path+meta.ext + // can be extra thread (if non vfs) + struct FragFileEntry { + std::string id_str; + std::filesystem::path frag_path; + std::string meta_ext; + + bool operator==(const FragFileEntry& other) const { + // only compare by id + return id_str == other.id_str; + } + }; + struct FragFileEntryHash { + size_t operator()(const FragFileEntry& it) const { + return entt::hashed_string(it.id_str.data(), it.id_str.size()); + } + }; + entt::dense_set file_frag_list; + + std::filesystem::path storage_path{path}; + + auto handle_file = [&](const std::filesystem::path& file_path) { + if (!std::filesystem::is_regular_file(file_path)) { + return; + } + // handle file + + if (file_path.has_extension()) { + // skip over metadata, assuming only metafiles have extentions (might be wrong?) + // also skips temps + return; + } + + auto relative_path = std::filesystem::proximate(file_path, storage_path); + std::string id_str = relative_path.generic_u8string(); + // delete all '/' + id_str.erase(std::remove(id_str.begin(), id_str.end(), '/'), id_str.end()); + if (id_str.size() % 2 != 0) { + std::cerr << "FS error: non hex fragment uid detected: '" << id_str << "'\n"; + } + + if (file_frag_list.contains(FragFileEntry{id_str, {}, ""})) { + std::cerr << "FS error: fragment duplicate detected: '" << id_str << "'\n"; + return; // skip + } + + const char* meta_ext = ".meta.msgpack"; + { // find meta + // TODO: this as to know all possible extentions + bool has_meta_msgpack = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.msgpack"); + bool has_meta_json = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.json"); + const size_t meta_sum = + (has_meta_msgpack?1:0) + + (has_meta_json?1:0) + ; + + if (meta_sum > 1) { // has multiple + std::cerr << "FS error: fragment with multiple meta files detected: " << id_str << "\n"; + return; // skip + } + + if (meta_sum == 0) { + std::cerr << "FS error: fragment missing meta file detected: " << id_str << "\n"; + return; // skip + } + + if (has_meta_json) { + meta_ext = ".meta.json"; + } + } + + file_frag_list.emplace(FragFileEntry{ + std::move(id_str), + file_path, + meta_ext + }); + }; + + for (const auto& outer_path : std::filesystem::directory_iterator(storage_path)) { + if (std::filesystem::is_regular_file(outer_path)) { + handle_file(outer_path); + } else if (std::filesystem::is_directory(outer_path)) { + // subdir, part of id + for (const auto& inner_path : std::filesystem::directory_iterator(outer_path)) { + //if (std::filesystem::is_regular_file(inner_path)) { + + //// handle file + //} // TODO: support deeper recursion? + handle_file(inner_path); + } + } + } + + std::cout << "FS: scan found:\n"; + for (const auto& it : file_frag_list) { + std::cout << " " << it.id_str << "\n"; + } + + // step 2: check if files preexist in reg + // main thread + // (merge into step 3 and do more error checking?) + for (auto it = file_frag_list.begin(); it != file_frag_list.end();) { + auto id = hex2bin(it->id_str); + auto fid = getFragmentByID(id); + if (_reg.valid(fid)) { + // pre exising (handle differently??) + // check if store differs? + it = file_frag_list.erase(it); + } else { + it++; + } + } + + size_t count {0}; + // step 3: parse meta and insert into reg of non preexising + // main thread + // TODO: check timestamps of preexisting and reload? mark external/remote dirty? + for (const auto& it : file_frag_list) { + nlohmann::json j; + if (it.meta_ext == ".meta.msgpack") { + // uh + // read binary header + } else if (it.meta_ext == ".meta.json") { + std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext); + if (!file.is_open()) { + std::cout << "FS error: failed opening meta " << it.frag_path << "\n"; + continue; + } + + file >> j; + + if (!j.is_object()) { + std::cerr << "FS error: json in meta is broken " << it.id_str << "\n"; + continue; + } + + // TODO: existing fragment file + //newFragmentFile(); + FragmentHandle fh{_reg, _reg.create()}; + fh.emplace(hex2bin(it.id_str)); + + for (const auto& [k, v] : j.items()) { + // type id from string hash + const auto type_id = entt::hashed_string(k.data(), k.size()); + const auto deserl_fn_it = _sc._deserl_json.find(type_id); + if (deserl_fn_it != _sc._deserl_json.cend()) { + // TODO: check return value + deserl_fn_it->second(fh, v); + } else { + std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n"; + } + } + count++; + } else { + assert(false); + } + } + + return count; +} + +void FragmentStore::scanStoragePathAsync(std::string path) { + // add path to queue + // HACK: if path is known/any fragment is in the path, this operation blocks (non async) + scanStoragePath(path); // TODO: make async and post result +} + static bool serl_json_data_enc_type(void* comp, nlohmann::json& out) { if (comp == nullptr) { return false; @@ -352,6 +533,20 @@ static bool serl_json_data_enc_type(void* comp, nlohmann::json& out) { return true; } +static bool deserl_json_data_enc_type(FragmentHandle fh, const nlohmann::json& in) { + // TODO: this is ugly in multiple places + try { + fh.emplace_or_replace( + static_cast( + static_cast>(in) + ) + ); + } catch(...) { + return false; + } + return true; +} + static bool serl_json_data_comp_type(void* comp, nlohmann::json& out) { if (comp == nullptr) { return false; @@ -366,6 +561,7 @@ static bool serl_json_data_comp_type(void* comp, nlohmann::json& out) { void FragmentStore::registerSerializers(void) { _sc.registerSerializerJson(serl_json_data_enc_type); + _sc.registerDeSerializerJson(deserl_json_data_enc_type); _sc.registerSerializerJson(serl_json_data_comp_type); std::cout << "registered serl text json cbs:\n"; diff --git a/src/fragment_store/fragment_store.hpp b/src/fragment_store/fragment_store.hpp index 6dda33de..1025f9e6 100644 --- a/src/fragment_store/fragment_store.hpp +++ b/src/fragment_store/fragment_store.hpp @@ -17,10 +17,11 @@ #include #include -struct FragmentStore : public FragmentStoreI { - using FragmentHandle = entt::basic_handle>; +// fwd +struct SerializerCallbacks; - entt::basic_registry _reg; +struct FragmentStore : public FragmentStoreI { + FragmentRegistry _reg; std::minstd_rand _rng{std::random_device{}()}; std::array _session_uuid_namespace; @@ -87,6 +88,9 @@ struct FragmentStore : public FragmentStoreI { bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size); // fragment discovery? + // returns number of new fragments + size_t scanStoragePath(std::string_view path); + void scanStoragePathAsync(std::string path); private: void registerSerializers(void); // internal comps diff --git a/src/fragment_store/fragment_store_i.hpp b/src/fragment_store/fragment_store_i.hpp index 3a77f4c9..be3f9c13 100644 --- a/src/fragment_store/fragment_store_i.hpp +++ b/src/fragment_store/fragment_store_i.hpp @@ -1,9 +1,13 @@ #pragma once +#include + #include // internal id enum class FragmentID : uint32_t {}; +using FragmentRegistry = entt::basic_registry; +using FragmentHandle = entt::basic_handle; struct FragmentStoreI { virtual ~FragmentStoreI(void) {} diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index cba9b6b4..24c4b312 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -64,6 +64,19 @@ static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) { return true; } +static bool deserl_json_msg_ts_range(FragmentHandle fh, const nlohmann::json& in) { + // TODO: this is ugly in multiple places + try { + fh.emplace_or_replace(FragComp::MessagesTSRange{ + in["begin"], + in["end"] + }); + } catch(...) { + return false; + } + return true; +} + static bool serl_json_msg_c_id(void* comp, nlohmann::json& out) { if (comp == nullptr) { return false; @@ -78,6 +91,18 @@ static bool serl_json_msg_c_id(void* comp, nlohmann::json& out) { return true; } +static bool deserl_json_msg_c_id(FragmentHandle fh, const nlohmann::json& in) { + // TODO: this is ugly in multiple places + try { + fh.emplace_or_replace(FragComp::MessagesContact{ + in["id"] + }); + } catch(...) { + return false; + } + return true; +} + void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (!static_cast(m)) { return; // huh? @@ -210,7 +235,9 @@ MessageFragmentStore::MessageFragmentStore( _rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); _fs._sc.registerSerializerJson(serl_json_msg_ts_range); + _fs._sc.registerDeSerializerJson(deserl_json_msg_ts_range); _fs._sc.registerSerializerJson(serl_json_msg_c_id); + _fs._sc.registerDeSerializerJson(deserl_json_msg_c_id); _sc.registerSerializerJson(serl_json_default); _sc.registerSerializerJson(serl_json_default); @@ -227,9 +254,12 @@ MessageFragmentStore::MessageFragmentStore( _sc.registerSerializerJson(serl_json_default); _sc.registerSerializerJson(serl_json_default); _sc.registerSerializerJson(serl_json_default); + + _fs.scanStoragePath("test_message_store/"); } MessageFragmentStore::~MessageFragmentStore(void) { + // TODO: sync all dirty fragments } float MessageFragmentStore::tick(float time_delta) { diff --git a/src/fragment_store/serializer.hpp b/src/fragment_store/serializer.hpp index a09feec7..bf12c4e6 100644 --- a/src/fragment_store/serializer.hpp +++ b/src/fragment_store/serializer.hpp @@ -5,13 +5,15 @@ #include +#include "./fragment_store_i.hpp" + struct SerializerCallbacks { // nlohmann // json/msgpack using serialize_json_fn = bool(*)(void* comp, nlohmann::json& out); entt::dense_map _serl_json; - using deserialize_json_fn = bool(*)(void* comp, const nlohmann::json& in); + using deserialize_json_fn = bool(*)(FragmentHandle fh, const nlohmann::json& in); entt::dense_map _deserl_json; void registerSerializerJson(serialize_json_fn fn, const entt::type_info& type_info);