From 3d0863ff9a50082cba303f450c4560a6557f9c1a Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 17 Feb 2024 11:49:03 +0100 Subject: [PATCH] basically working, but some dup glitch is still there --- src/fragment_store/fragment_store.cpp | 62 ++++++- src/fragment_store/fragment_store.hpp | 6 + src/fragment_store/message_fragment_store.cpp | 166 ++++++++++++++---- src/fragment_store/message_fragment_store.hpp | 7 +- src/fragment_store/message_serializer.cpp | 4 +- 5 files changed, 203 insertions(+), 42 deletions(-) diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index 3c0a4cc..98a08e0 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -370,6 +370,64 @@ bool FragmentStore::syncToStorage(FragmentID fid, const uint8_t* data, const uin return syncToStorage(fid, fn_cb); } +bool FragmentStore::loadFromStorage(FragmentID fid, std::function& data_cb) { + if (!_reg.valid(fid)) { + return false; + } + + if (!_reg.all_of(fid)) { + // not a file fragment? + // TODO: memory fragments + return false; + } + + const auto& frag_path = _reg.get(fid).path; + + // TODO: check if metadata dirty? + // TODO: what if file changed on disk? + + std::cout << "FS: loading fragment '" << frag_path << "'\n"; + + std::ifstream data_file{ + frag_path, + std::ios::in | std::ios::binary // always binary, also for text + }; + + if (!data_file.is_open()) { + std::cerr << "FS error: fragment data file failed to open '" << frag_path << "'\n"; + // error + return false; + } + + std::array buffer; + uint64_t buffer_actual_size {0}; + do { + data_file.read(reinterpret_cast(buffer.data()), buffer.size()); + buffer_actual_size = data_file.gcount(); + + if (buffer_actual_size == 0) { + break; + } + + data_cb(buffer.data(), buffer_actual_size); + } while (buffer_actual_size == buffer.size() && !data_file.eof()); + + return true; +} + +nlohmann::json FragmentStore::loadFromStorageNJ(FragmentID fid) { + std::vector tmp_buffer; + std::function cb = [&tmp_buffer](const uint8_t* buffer, const uint64_t buffer_size) { + tmp_buffer.insert(tmp_buffer.end(), buffer, buffer+buffer_size); + }; + + if (!loadFromStorage(fid, cb)) { + return nullptr; + } + + return nlohmann::json::parse(tmp_buffer); +} + size_t FragmentStore::scanStoragePath(std::string_view path) { if (path.empty()) { path = _default_store_path; @@ -505,7 +563,7 @@ size_t FragmentStore::scanStoragePath(std::string_view path) { // read binary header assert(false); } else if (it.meta_ext == ".meta.json") { - std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext); + std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); if (!file.is_open()) { std::cout << "FS error: failed opening meta " << it.frag_path << "\n"; continue; @@ -523,6 +581,8 @@ size_t FragmentStore::scanStoragePath(std::string_view path) { FragmentHandle fh{_reg, _reg.create()}; fh.emplace(hex2bin(it.id_str)); + fh.emplace(it.frag_path.generic_u8string()); + for (const auto& [k, v] : j.items()) { // type id from string hash const auto type_id = entt::hashed_string(k.data(), k.size()); diff --git a/src/fragment_store/fragment_store.hpp b/src/fragment_store/fragment_store.hpp index 0aec782..811b4be 100644 --- a/src/fragment_store/fragment_store.hpp +++ b/src/fragment_store/fragment_store.hpp @@ -82,6 +82,12 @@ struct FragmentStore : public FragmentStoreI { bool syncToStorage(FragmentID fid, std::function& data_cb); bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size); + // ========== load fragment data from storage ========== + using read_from_storage_put_data_cb = void(const uint8_t* buffer, const uint64_t buffer_size); + bool loadFromStorage(FragmentID fid, std::function& data_cb); + // convenience function + nlohmann::json loadFromStorageNJ(FragmentID fid); + // fragment discovery? // returns number of new fragments size_t scanStoragePath(std::string_view path); diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index e61112d..b81ca07 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -7,10 +7,13 @@ #include +#include #include #include #include +// https://youtu.be/CU2exyhYPfA + namespace Message::Components { // ctx @@ -46,43 +49,6 @@ namespace Fragment::Components { NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesContact, id) } // Fragment::Components -template -static bool serl_json_default(void* comp, nlohmann::json& out) { - if constexpr (!std::is_empty_v) { - out = *reinterpret_cast(comp); - } // do nothing if empty type - return true; -} - -static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) { - if (comp == nullptr) { - return false; - } - - out = nlohmann::json::object(); - - auto& r_comp = *reinterpret_cast(comp); - - out["begin"] = r_comp.begin; - out["end"] = r_comp.end; - - return true; -} - -static bool serl_json_msg_c_id(void* comp, nlohmann::json& out) { - if (comp == nullptr) { - return false; - } - - out = nlohmann::json::object(); - - auto& r_comp = *reinterpret_cast(comp); - - out["id"] = r_comp.id; - - return true; -} - void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (!static_cast(m)) { return; // huh? @@ -95,6 +61,30 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (!m.registry()->ctx().contains()) { // first message in this reg m.registry()->ctx().emplace(); + + // TODO: move this to async + // new reg -> load all fragments for this contact (for now, ranges later) + for (const auto& [fid, tsrange, fmc] : _fs._reg.view().each()) { + Contact3 frag_contact = entt::null; + // TODO: id lookup table, this is very inefficent + for (const auto& [c_it, id_it] : _cr.view().each()) { + if (fmc.id == id_it.data) { + //h.emplace_or_replace(c_it); + //return true; + frag_contact = c_it; + break; + } + } + if (!_cr.valid(frag_contact)) { + // unkown contact + continue; + } + + // registry is the same as the one the message event is for + if (static_cast(_rmm).get(frag_contact) == m.registry()) { + loadFragment(*m.registry(), FragmentHandle{_fs._reg, fid}); + } + } } auto& fuid_open = m.registry()->ctx().get().fuid_open; @@ -209,6 +199,60 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // on new and update: mark as fragment dirty } +void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh) { + std::cout << "MFS: loadFragment\n"; + const auto j = _fs.loadFromStorageNJ(fh); + + if (!j.is_array()) { + // wrong data + return; + } + + for (const auto& j_entry : j) { + auto new_real_msg = Message3Handle{reg, reg.create()}; + // load into staging reg + for (const auto& [k, v] : j_entry.items()) { + std::cout << "K:" << k << " V:" << v.dump() << "\n"; + const auto type_id = entt::hashed_string(k.data(), k.size()); + const auto deserl_fn_it = _sc._deserl_json.find(type_id); + if (deserl_fn_it != _sc._deserl_json.cend()) { + try { + if (!deserl_fn_it->second(_sc, new_real_msg, v)) { + std::cerr << "MFS error: failed deserializing '" << k << "'\n"; + } + } catch(...) { + std::cerr << "MFS error: failed deserializing (threw) '" << k << "'\n"; + } + } else { + std::cerr << "MFS warning: missing deserializer for meta key '" << k << "'\n"; + } + } + + new_real_msg.emplace_or_replace(fh.get()); + + // TODO: dup checking + const bool is_dup {false}; + + // dup check (hacky, specific to protocols) + if (is_dup) { + // -> merge with preexisting + // -> throw update + reg.destroy(new_real_msg); + //_rmm.throwEventUpdate(reg, new_real_msg); + } else { + if (!new_real_msg.all_of()) { + // does not have needed components to be stand alone + reg.destroy(new_real_msg); + std::cerr << "MFS warning: message with missing basic compoments\n"; + continue; + } + + // -> throw create + _rmm.throwEventConstruct(reg, new_real_msg); + } + } +} + MessageFragmentStore::MessageFragmentStore( Contact3Registry& cr, RegistryMessageModel& rmm, @@ -250,6 +294,8 @@ MessageFragmentStore::MessageFragmentStore( _sc.registerDeSerializerJson(); _sc.registerSerializerJson(); _sc.registerDeSerializerJson(); + + _fs.subscribe(this, FragmentStore_Event::fragment_construct); } MessageFragmentStore::~MessageFragmentStore(void) { @@ -302,7 +348,7 @@ float MessageFragmentStore::tick(float time_delta) { //nlohmann::json::to_msgpack(j); auto j_dump = j.dump(2, ' ', true); if (_fs.syncToStorage(fid, reinterpret_cast(j_dump.data()), j_dump.size())) { - std::cout << "MFS: dumped " << j_dump << "\n"; + //std::cout << "MFS: dumped " << j_dump << "\n"; // succ _fuid_save_queue.pop(); } @@ -327,3 +373,47 @@ bool MessageFragmentStore::onEvent(const Message::Events::MessageUpdated& e) { // TODO: handle deletes? diff between unload? +bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) { + if (_fs_ignore_event) { + return false; // skip self + } + + if (!e.e.all_of()) { + return false; // not for us + } + + // TODO: are we sure it is a *new* fragment? + + //std::cout << "MFS: got frag for us!\n"; + + Contact3 frag_contact = entt::null; + { // get contact + const auto& frag_contact_id = e.e.get().id; + // TODO: id lookup table, this is very inefficent + for (const auto& [c_it, id_it] : _cr.view().each()) { + if (frag_contact_id == id_it.data) { + //h.emplace_or_replace(c_it); + //return true; + frag_contact = c_it; + break; + } + } + if (!_cr.valid(frag_contact)) { + // unkown contact + return false; + } + } + + // only load if msg reg open + auto* msg_reg = static_cast(_rmm).get(frag_contact); + if (msg_reg == nullptr) { + // msg reg not created yet + return false; + } + + // TODO: should this be done async / on tick() instead of on event? + loadFragment(*msg_reg, e.e); + + return false; +} + diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index 39357a5..cc60870 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -42,7 +42,7 @@ namespace Fragment::Components { // on new message: assign fuid // on new and update: mark as fragment dirty // on delete: mark as fragment dirty? -class MessageFragmentStore : public RegistryMessageModelEventI { +class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentStoreEventI { protected: Contact3Registry& _cr; RegistryMessageModel& _rmm; @@ -54,6 +54,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI { void handleMessage(const Message3Handle& m); + void loadFragment(Message3Registry& reg, FragmentHandle fh); + struct QueueEntry final { uint64_t ts_since_dirty{0}; std::vector id; @@ -76,5 +78,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI { protected: // rmm bool onEvent(const Message::Events::MessageConstruct& e) override; bool onEvent(const Message::Events::MessageUpdated& e) override; + + protected: // fs + bool onEvent(const Fragment::Events::FragmentConstruct& e) override; }; diff --git a/src/fragment_store/message_serializer.cpp b/src/fragment_store/message_serializer.cpp index 98c6778..78362e9 100644 --- a/src/fragment_store/message_serializer.cpp +++ b/src/fragment_store/message_serializer.cpp @@ -36,7 +36,7 @@ bool MessageSerializerCallbacks::component_emplace_or_replace_json>(j); + const auto id = static_cast>(j.is_binary()?j:j["bytes"]); // TODO: id lookup table, this is very inefficent for (const auto& [c_it, id_it] : msc.cr.view().each()) { @@ -79,7 +79,7 @@ bool MessageSerializerCallbacks::component_emplace_or_replace_json>(j); + const auto id = static_cast>(j.is_binary()?j:j["bytes"]); // TODO: id lookup table, this is very inefficent for (const auto& [c_it, id_it] : msc.cr.view().each()) {