From f287348550f2e6299334d264ebab81b713a034f9 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 13 Apr 2024 19:13:18 +0200 Subject: [PATCH] introduce message fragments version 2 (msgpack) more smaller refactors --- src/fragment_store/internal_mfs_contexts.cpp | 22 ++--- src/fragment_store/internal_mfs_contexts.hpp | 2 +- src/fragment_store/message_fragment_store.cpp | 81 +++++++++++++------ src/fragment_store/message_fragment_store.hpp | 2 +- src/fragment_store/message_serializer.cpp | 18 ++++- .../messages_meta_components.hpp | 3 +- 6 files changed, 88 insertions(+), 40 deletions(-) diff --git a/src/fragment_store/internal_mfs_contexts.cpp b/src/fragment_store/internal_mfs_contexts.cpp index a07ce4c..cadb952 100644 --- a/src/fragment_store/internal_mfs_contexts.cpp +++ b/src/fragment_store/internal_mfs_contexts.cpp @@ -26,7 +26,7 @@ static bool isLess(const std::vector& lhs, const std::vector& } bool Message::Contexts::ContactFragments::insert(ObjectHandle frag) { - if (frags.contains(frag)) { + if (sorted_frags.contains(frag)) { return false; } @@ -85,22 +85,22 @@ bool Message::Contexts::ContactFragments::insert(ObjectHandle frag) { sorted_end.insert(pos, frag); } - frags.emplace(frag, InternalEntry{begin_index, end_index}); + sorted_frags.emplace(frag, InternalEntry{begin_index, end_index}); // now adjust all indicies of fragments coming after the insert position for (size_t i = begin_index + 1; i < sorted_begin.size(); i++) { - frags.at(sorted_begin[i]).i_b = i; + sorted_frags.at(sorted_begin[i]).i_b = i; } for (size_t i = end_index + 1; i < sorted_end.size(); i++) { - frags.at(sorted_end[i]).i_e = i; + sorted_frags.at(sorted_end[i]).i_e = i; } return true; } bool Message::Contexts::ContactFragments::erase(Object frag) { - auto frags_it = frags.find(frag); - if (frags_it == frags.end()) { + auto frags_it = sorted_frags.find(frag); + if (frags_it == sorted_frags.end()) { return false; } @@ -110,7 +110,7 @@ bool Message::Contexts::ContactFragments::erase(Object frag) { sorted_begin.erase(sorted_begin.begin() + frags_it->second.i_b); sorted_end.erase(sorted_end.begin() + frags_it->second.i_e); - frags.erase(frags_it); + sorted_frags.erase(frags_it); return true; } @@ -118,8 +118,8 @@ bool Message::Contexts::ContactFragments::erase(Object frag) { Object Message::Contexts::ContactFragments::prev(Object frag) const { // uses range begin to go back in time - auto it = frags.find(frag); - if (it == frags.end()) { + auto it = sorted_frags.find(frag); + if (it == sorted_frags.end()) { return entt::null; } @@ -134,8 +134,8 @@ Object Message::Contexts::ContactFragments::prev(Object frag) const { Object Message::Contexts::ContactFragments::next(Object frag) const { // uses range end to go forward in time - auto it = frags.find(frag); - if (it == frags.end()) { + auto it = sorted_frags.find(frag); + if (it == sorted_frags.end()) { return entt::null; } diff --git a/src/fragment_store/internal_mfs_contexts.hpp b/src/fragment_store/internal_mfs_contexts.hpp index 99aecd1..4283de8 100644 --- a/src/fragment_store/internal_mfs_contexts.hpp +++ b/src/fragment_store/internal_mfs_contexts.hpp @@ -23,7 +23,7 @@ namespace Message::Contexts { size_t i_b; size_t i_e; }; - entt::dense_map frags; + entt::dense_map sorted_frags; // add 2 sorted contact lists for both range begin and end // TODO: adding and removing becomes expensive with enough frags, consider splitting or heap diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index 0d09965..7284432 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -5,6 +5,9 @@ #include #include "../json/message_components.hpp" +#include "messages_meta_components.hpp" +#include "nlohmann/json_fwd.hpp" +#include "solanaceae/util/span.hpp" #include @@ -55,7 +58,16 @@ static nlohmann::json loadFromStorageNJ(ObjectHandle oh) { return false; } - return nlohmann::json::parse(tmp_buffer, nullptr, false); + const auto obj_version = oh.get().v; + + if (obj_version == 1) { + return nlohmann::json::parse(tmp_buffer, nullptr, false); + } else if (obj_version == 2) { + return nlohmann::json::from_msgpack(tmp_buffer, true, false); + } else { + assert(false); + return {}; + } } void MessageFragmentStore::handleMessage(const Message3Handle& m) { @@ -228,13 +240,13 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { m.emplace_or_replace(fragment_id); // in this case we know the fragment needs an update - for (const auto& it : _fuid_save_queue) { + for (const auto& it : _frag_save_queue) { if (it.id == fragment_id) { // already in queue return; // done } } - _fuid_save_queue.push_back({Message::getTimeMS(), {_os.registry(), fragment_id}, m.registry()}); + _frag_save_queue.push_back({Message::getTimeMS(), {_os.registry(), fragment_id}, m.registry()}); return; // done } @@ -253,7 +265,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (fid_open.contains(msg_fh)) { // TODO: dedup events // TODO: cooldown per fragsave - _fuid_save_queue.push_back({Message::getTimeMS(), msg_fh, m.registry()}); + _frag_save_queue.push_back({Message::getTimeMS(), msg_fh, m.registry()}); return; } @@ -269,7 +281,20 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // need update from frag void MessageFragmentStore::loadFragment(Message3Registry& reg, ObjectHandle fh) { std::cout << "MFS: loadFragment\n"; - const auto j = loadFromStorageNJ(fh); + // version HAS to be set, or we just fail + if (!fh.all_of()) { + std::cerr << "MFS error: nope, object without version, cant load\n"; + return; + } + + nlohmann::json j; + const auto obj_version = fh.get().v; + if (obj_version == 1 || obj_version == 2) { + j = loadFromStorageNJ(fh); // also handles version and json/msgpack + } else { + std::cerr << "MFS error: nope, object with unknown version, cant load\n"; + return; + } if (!j.is_array()) { // wrong data @@ -377,7 +402,7 @@ bool MessageFragmentStore::syncFragToStorage(ObjectHandle fh, Message3Registry& // TODO: does every message have ts? auto msg_view = reg.view(); - // we also assume all messages have fid + // we also assume all messages have an associated object for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) { const Message3 m = *it; @@ -385,12 +410,13 @@ bool MessageFragmentStore::syncFragToStorage(ObjectHandle fh, Message3Registry& continue; } - // require msg for now + // filter: require msg for now + // this will be removed in the future if (!reg.any_of(m)) { continue; } - if (_fuid_save_queue.front().id != reg.get(m).o) { + if (_frag_save_queue.front().id != reg.get(m).o) { continue; // not ours } @@ -430,13 +456,20 @@ bool MessageFragmentStore::syncFragToStorage(ObjectHandle fh, Message3Registry& // we cant skip if array is empty (in theory it will not be empty later on) - // if save as binary - //nlohmann::json::to_msgpack(j); - auto j_dump = j.dump(2, ' ', true); + std::vector data_to_save; + const auto obj_version = fh.get_or_emplace().v; + if (obj_version == 1) { + auto j_dump = j.dump(2, ' ', true); + data_to_save = std::vector(j_dump.cbegin(), j_dump.cend()); + } else if (obj_version == 2) { + data_to_save = nlohmann::json::to_msgpack(j); + } else { + std::cerr << "MFS error: unknown object version\n"; + assert(false); + } assert(fh.all_of()); auto* backend = fh.get().ptr; - //if (_os.syncToStorage(fh, reinterpret_cast(j_dump.data()), j_dump.size())) { - if (backend->write(fh, {reinterpret_cast(j_dump.data()), j_dump.size()})) { + if (backend->write(fh, {reinterpret_cast(data_to_save.data()), data_to_save.size()})) { // TODO: make this better, should this be called on fail? should this be called before sync? (prob not) _fs_ignore_event = true; _os.throwEventUpdate(fh); @@ -480,12 +513,12 @@ MessageFragmentStore::MessageFragmentStore( } MessageFragmentStore::~MessageFragmentStore(void) { - while (!_fuid_save_queue.empty()) { - auto fh = _fuid_save_queue.front().id; - auto* reg = _fuid_save_queue.front().reg; + while (!_frag_save_queue.empty()) { + auto fh = _frag_save_queue.front().id; + auto* reg = _frag_save_queue.front().reg; assert(reg != nullptr); syncFragToStorage(fh, *reg); - _fuid_save_queue.pop_front(); // pop unconditionally + _frag_save_queue.pop_front(); // pop unconditionally } } @@ -538,14 +571,14 @@ static bool rangeVisible(uint64_t range_begin, uint64_t range_end, const Message float MessageFragmentStore::tick(float) { const auto ts_now = Message::getTimeMS(); // sync dirty fragments here - if (!_fuid_save_queue.empty()) { + if (!_frag_save_queue.empty()) { // wait 10sec before saving - if (_fuid_save_queue.front().ts_since_dirty + 10*1000 <= ts_now) { - auto fh = _fuid_save_queue.front().id; - auto* reg = _fuid_save_queue.front().reg; + if (_frag_save_queue.front().ts_since_dirty + 10*1000 <= ts_now) { + auto fh = _frag_save_queue.front().id; + auto* reg = _frag_save_queue.front().reg; assert(reg != nullptr); if (syncFragToStorage(fh, *reg)) { - _fuid_save_queue.pop_front(); + _frag_save_queue.pop_front(); } } } @@ -612,13 +645,13 @@ float MessageFragmentStore::tick(float) { // that is not already loaded !! if (msg_reg->ctx().contains()) { const auto& cf = msg_reg->ctx().get(); - if (!cf.frags.empty()) { + if (!cf.sorted_frags.empty()) { if (!msg_reg->ctx().contains()) { msg_reg->ctx().emplace(); } const auto& loaded_frags = msg_reg->ctx().get().loaded_frags; - for (const auto& [fid, si] : msg_reg->ctx().get().frags) { + for (const auto& [fid, si] : msg_reg->ctx().get().sorted_frags) { if (loaded_frags.contains(fid)) { continue; } diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index 2f13aca..f7ec69f 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -86,7 +86,7 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public ObjectSto ObjectHandle id; Message3Registry* reg{nullptr}; }; - std::deque _fuid_save_queue; + std::deque _frag_save_queue; struct ECQueueEntry final { ObjectHandle fid; diff --git a/src/fragment_store/message_serializer.cpp b/src/fragment_store/message_serializer.cpp index 35823e6..7b86c5a 100644 --- a/src/fragment_store/message_serializer.cpp +++ b/src/fragment_store/message_serializer.cpp @@ -1,5 +1,6 @@ #include "./message_serializer.hpp" +#include #include #include @@ -39,6 +40,7 @@ bool MessageSerializerCallbacks::component_get_json bool MessageSerializerCallbacks::component_emplace_or_replace_json(MessageSerializerCallbacks& msc, Handle h, const nlohmann::json& j) { if (j.is_null()) { @@ -47,7 +49,12 @@ bool MessageSerializerCallbacks::component_emplace_or_replace_json id = j.is_binary()?j:j["bytes"]; + std::vector id; + if (j.is_binary()) { + id = j.get_binary(); + } else { + j["bytes"].get_to(id); + } Contact3 other_c = findContactByID(msc.cr, id); if (!msc.cr.valid(other_c)) { @@ -83,6 +90,7 @@ bool MessageSerializerCallbacks::component_get_json bool MessageSerializerCallbacks::component_emplace_or_replace_json(MessageSerializerCallbacks& msc, Handle h, const nlohmann::json& j) { if (j.is_null()) { @@ -91,7 +99,12 @@ bool MessageSerializerCallbacks::component_emplace_or_replace_json id = j.is_binary()?j:j["bytes"]; + std::vector id; + if (j.is_binary()) { + id = j.get_binary(); + } else { + j["bytes"].get_to(id); + } Contact3 other_c = findContactByID(msc.cr, id); if (!msc.cr.valid(other_c)) { @@ -105,3 +118,4 @@ bool MessageSerializerCallbacks::component_emplace_or_replace_json text_json - uint16_t v {1}; + // 2 -> msgpack + uint16_t v {2}; }; struct MessagesTSRange {