diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 23d91384..91722ad0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,6 +10,8 @@ add_library(message_fragment_store ./fragment_store/message_serializer.cpp ./fragment_store/messages_meta_components.hpp ./fragment_store/messages_meta_components_id.inl + ./fragment_store/internal_mfs_contexts.hpp + ./fragment_store/internal_mfs_contexts.cpp ./fragment_store/message_fragment_store.hpp ./fragment_store/message_fragment_store.cpp diff --git a/src/fragment_store/internal_mfs_contexts.cpp b/src/fragment_store/internal_mfs_contexts.cpp new file mode 100644 index 00000000..a07ce4cd --- /dev/null +++ b/src/fragment_store/internal_mfs_contexts.cpp @@ -0,0 +1,149 @@ +#include "./internal_mfs_contexts.hpp" + +#include "./message_fragment_store.hpp" + +#include +#include +#include + +#include + +static bool isLess(const std::vector& lhs, const std::vector& rhs) { + size_t i = 0; + for (; i < lhs.size() && i < rhs.size(); i++) { + if (lhs[i] < rhs[i]) { + return true; + } else if (lhs[i] > rhs[i]) { + return false; + } + // else continue + } + + // here we have equality of common lenths + + // we define smaller arrays to be less + return lhs.size() < rhs.size(); +} + +bool Message::Contexts::ContactFragments::insert(ObjectHandle frag) { + if (frags.contains(frag)) { + return false; + } + + // both sorted arrays are sorted ascending + // so for insertion we search for the last index that is <= and insert after it + // or we search for the first > (or end) and insert before it <--- + // since equal fragments are UB, we can assume they are only > or < + + size_t begin_index {0}; + { // begin + const auto pos = std::find_if( + sorted_begin.cbegin(), + sorted_begin.cend(), + [frag](const Object a) -> bool { + const auto begin_a = frag.registry()->get(a).begin; + const auto begin_frag = frag.get().begin; + if (begin_a > begin_frag) { + return true; + } else if (begin_a < begin_frag) { + return false; + } else { + // equal ts, we need to fall back to id (id can not be equal) + return isLess(frag.get().v, frag.registry()->get(a).v); + } + } + ); + + begin_index = std::distance(sorted_begin.cbegin(), pos); + + // we need to insert before pos (end is valid here) + sorted_begin.insert(pos, frag); + } + + size_t end_index {0}; + { // end + const auto pos = std::find_if_not( + sorted_end.cbegin(), + sorted_end.cend(), + [frag](const Object a) -> bool { + const auto end_a = frag.registry()->get(a).end; + const auto end_frag = frag.get().end; + if (end_a > end_frag) { + return true; + } else if (end_a < end_frag) { + return false; + } else { + // equal ts, we need to fall back to id (id can not be equal) + return isLess(frag.get().v, frag.registry()->get(a).v); + } + } + ); + + end_index = std::distance(sorted_end.cbegin(), pos); + + // we need to insert before pos (end is valid here) + sorted_end.insert(pos, frag); + } + + frags.emplace(frag, InternalEntry{begin_index, end_index}); + + // now adjust all indicies of fragments coming after the insert position + for (size_t i = begin_index + 1; i < sorted_begin.size(); i++) { + frags.at(sorted_begin[i]).i_b = i; + } + for (size_t i = end_index + 1; i < sorted_end.size(); i++) { + frags.at(sorted_end[i]).i_e = i; + } + + return true; +} + +bool Message::Contexts::ContactFragments::erase(Object frag) { + auto frags_it = frags.find(frag); + if (frags_it == frags.end()) { + return false; + } + + assert(sorted_begin.size() == sorted_end.size()); + assert(sorted_begin.size() > frags_it->second.i_b); + + sorted_begin.erase(sorted_begin.begin() + frags_it->second.i_b); + sorted_end.erase(sorted_end.begin() + frags_it->second.i_e); + + frags.erase(frags_it); + + return true; +} + +Object Message::Contexts::ContactFragments::prev(Object frag) const { + // uses range begin to go back in time + + auto it = frags.find(frag); + if (it == frags.end()) { + return entt::null; + } + + const auto src_i = it->second.i_b; + if (src_i > 0) { + return sorted_begin[src_i-1]; + } + + return entt::null; +} + +Object Message::Contexts::ContactFragments::next(Object frag) const { + // uses range end to go forward in time + + auto it = frags.find(frag); + if (it == frags.end()) { + return entt::null; + } + + const auto src_i = it->second.i_e; + if (src_i+1 < sorted_end.size()) { + return sorted_end[src_i+1]; + } + + return entt::null; +} + diff --git a/src/fragment_store/internal_mfs_contexts.hpp b/src/fragment_store/internal_mfs_contexts.hpp new file mode 100644 index 00000000..99aecd18 --- /dev/null +++ b/src/fragment_store/internal_mfs_contexts.hpp @@ -0,0 +1,53 @@ +#pragma once + +#include + +#include +#include + +// everything assumes a single object registry (and unique objects) + +namespace Message::Contexts { + + // ctx + struct OpenFragments { + // only contains fragments with <1024 messages and <2h tsrage (or whatever) + entt::dense_set open_frags; + }; + + // all message fragments of this contact + struct ContactFragments final { + // kept up-to-date by events + struct InternalEntry { + // indecies into the sorted arrays + size_t i_b; + size_t i_e; + }; + entt::dense_map frags; + + // add 2 sorted contact lists for both range begin and end + // TODO: adding and removing becomes expensive with enough frags, consider splitting or heap + std::vector sorted_begin; + std::vector sorted_end; + + // api + // return true if it was actually inserted + bool insert(ObjectHandle frag); + bool erase(Object frag); + // update? (just erase() + insert()) + + // uses range begin to go back in time + Object prev(Object frag) const; + // uses range end to go forward in time + Object next(Object frag) const; + }; + + // all LOADED message fragments + // TODO: merge into ContactFragments (and pull in openfrags) + struct LoadedContactFragments final { + // kept up-to-date by events + entt::dense_set loaded_frags; + }; + +} // Message::Contexts + diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index 90638a98..0d099655 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -1,5 +1,7 @@ #include "./message_fragment_store.hpp" +#include "./internal_mfs_contexts.hpp" + #include #include "../json/message_components.hpp" @@ -20,55 +22,7 @@ // https://youtu.be/CU2exyhYPfA -// everything assumes a single fragment registry - -namespace Message::Components { - - // ctx - struct OpenFragments { - //struct OpenFrag final { - ////std::vector uid; - //FragmentID id; - //}; - // only contains fragments with <1024 messages and <2h tsrage (or whatever) - entt::dense_set fid_open; - }; - - // all message fragments of this contact - struct ContactFragments final { - // kept up-to-date by events - struct InternalEntry { - // indecies into the sorted arrays - size_t i_b; - size_t i_e; - }; - entt::dense_map frags; - - // add 2 sorted contact lists for both range begin and end - // TODO: adding and removing becomes expensive with enough frags, consider splitting or heap - std::vector sorted_begin; - std::vector sorted_end; - - // api - // return true if it was actually inserted - bool insert(ObjectHandle frag); - bool erase(Object frag); - // update? (just erase() + insert()) - - // uses range begin to go back in time - Object prev(Object frag) const; - // uses range end to go forward in time - Object next(Object frag) const; - }; - - // all LOADED message fragments - // TODO: merge into ContactFragments (and pull in openfrags) - struct LoadedContactFragments final { - // kept up-to-date by events - entt::dense_set frags; - }; - -} // Message::Components +// everything assumes a single object registry (and unique objects) namespace ObjectStore::Components { NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesVersion, v) @@ -135,11 +89,11 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // TODO: use fid, seving full fuid for every message consumes alot of memory (and heap frag) if (!m.all_of()) { std::cout << "MFS: new msg missing Object\n"; - if (!m.registry()->ctx().contains()) { - m.registry()->ctx().emplace(); + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); } - auto& fid_open = m.registry()->ctx().get().fid_open; + auto& fid_open = m.registry()->ctx().get().open_frags; const auto msg_ts = m.get().ts; // missing fuid @@ -185,10 +139,10 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // assuming ts range exists fts_comp.begin = msg_ts; // extend into the past - if (m.registry()->ctx().contains()) { + if (m.registry()->ctx().contains()) { // should be the case - m.registry()->ctx().get().erase(fh); - m.registry()->ctx().get().insert(fh); + m.registry()->ctx().get().erase(fh); + m.registry()->ctx().get().insert(fh); } @@ -202,10 +156,10 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // assuming ts range exists fts_comp.end = msg_ts; // extend into the future - if (m.registry()->ctx().contains()) { + if (m.registry()->ctx().contains()) { // should be the case - m.registry()->ctx().get().erase(fh); - m.registry()->ctx().get().insert(fh); + m.registry()->ctx().get().erase(fh); + m.registry()->ctx().get().insert(fh); } // TODO: check conditions for open here @@ -245,16 +199,16 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } // contact frag - if (!m.registry()->ctx().contains()) { - m.registry()->ctx().emplace(); + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); } - m.registry()->ctx().get().insert(fh); + m.registry()->ctx().get().insert(fh); // loaded contact frag - if (!m.registry()->ctx().contains()) { - m.registry()->ctx().emplace(); + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); } - m.registry()->ctx().get().frags.emplace(fh); + m.registry()->ctx().get().loaded_frags.emplace(fh); fid_open.emplace(fragment_id); @@ -290,11 +244,11 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { return; // TODO: properly handle this case } - if (!m.registry()->ctx().contains()) { - m.registry()->ctx().emplace(); + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); } - auto& fid_open = m.registry()->ctx().get().fid_open; + auto& fid_open = m.registry()->ctx().get().open_frags; if (fid_open.contains(msg_fh)) { // TODO: dedup events @@ -330,16 +284,16 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, ObjectHandle fh) } // TODO: this should probably never be the case, since we already know here that it is a msg frag - if (!reg.ctx().contains()) { - reg.ctx().emplace(); + if (!reg.ctx().contains()) { + reg.ctx().emplace(); } - reg.ctx().get().insert(fh); + reg.ctx().get().insert(fh); // mark loaded - if (!reg.ctx().contains()) { - reg.ctx().emplace(); + if (!reg.ctx().contains()) { + reg.ctx().emplace(); } - reg.ctx().get().frags.emplace(fh); + reg.ctx().get().loaded_frags.emplace(fh); size_t messages_new_or_updated {0}; for (const auto& j_entry : j) { @@ -581,23 +535,6 @@ static bool rangeVisible(uint64_t range_begin, uint64_t range_end, const Message return false; } -static bool isLess(const std::vector& lhs, const std::vector& rhs) { - size_t i = 0; - for (; i < lhs.size() && i < rhs.size(); i++) { - if (lhs[i] < rhs[i]) { - return true; - } else if (lhs[i] > rhs[i]) { - return false; - } - // else continue - } - - // here we have equality of common lenths - - // we define smaller arrays to be less - return lhs.size() < rhs.size(); -} - float MessageFragmentStore::tick(float) { const auto ts_now = Message::getTimeMS(); // sync dirty fragments here @@ -673,15 +610,15 @@ float MessageFragmentStore::tick(float) { // first do collision check agains every contact associated fragment // that is not already loaded !! - if (msg_reg->ctx().contains()) { - const auto& cf = msg_reg->ctx().get(); + if (msg_reg->ctx().contains()) { + const auto& cf = msg_reg->ctx().get(); if (!cf.frags.empty()) { - if (!msg_reg->ctx().contains()) { - msg_reg->ctx().emplace(); + if (!msg_reg->ctx().contains()) { + msg_reg->ctx().emplace(); } - const auto& loaded_frags = msg_reg->ctx().get().frags; + const auto& loaded_frags = msg_reg->ctx().get().loaded_frags; - for (const auto& [fid, si] : msg_reg->ctx().get().frags) { + for (const auto& [fid, si] : msg_reg->ctx().get().frags) { if (loaded_frags.contains(fid)) { continue; } @@ -691,14 +628,14 @@ float MessageFragmentStore::tick(float) { if (!static_cast(fh)) { std::cerr << "MFS error: frag is invalid\n"; // WHAT - msg_reg->ctx().get().erase(fid); + msg_reg->ctx().get().erase(fid); return 0.05f; } if (!fh.all_of()) { std::cerr << "MFS error: frag has no range\n"; // ???? - msg_reg->ctx().get().erase(fid); + msg_reg->ctx().get().erase(fid); return 0.05f; } @@ -887,11 +824,11 @@ bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectConstruct& e return false; } - if (!msg_reg->ctx().contains()) { - msg_reg->ctx().emplace(); + if (!msg_reg->ctx().contains()) { + msg_reg->ctx().emplace(); } - msg_reg->ctx().get().erase(e.e); // TODO: can this happen? update - msg_reg->ctx().get().insert(e.e); + msg_reg->ctx().get().erase(e.e); // TODO: can this happen? update + msg_reg->ctx().get().insert(e.e); _event_check_queue.push_back(ECQueueEntry{e.e, frag_contact}); @@ -943,11 +880,11 @@ bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectUpdate& e) { return false; } - if (!msg_reg->ctx().contains()) { - msg_reg->ctx().emplace(); + if (!msg_reg->ctx().contains()) { + msg_reg->ctx().emplace(); } - msg_reg->ctx().get().erase(e.e); // TODO: check/update/fragment update - msg_reg->ctx().get().insert(e.e); + msg_reg->ctx().get().erase(e.e); // TODO: check/update/fragment update + msg_reg->ctx().get().insert(e.e); // TODO: actually load it //_event_check_queue.push_back(ECQueueEntry{e.e, frag_contact}); @@ -955,125 +892,3 @@ bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectUpdate& e) { return false; } -bool Message::Components::ContactFragments::insert(ObjectHandle frag) { - if (frags.contains(frag)) { - return false; - } - - // both sorted arrays are sorted ascending - // so for insertion we search for the last index that is <= and insert after it - // or we search for the first > (or end) and insert before it <--- - // since equal fragments are UB, we can assume they are only > or < - - size_t begin_index {0}; - { // begin - const auto pos = std::find_if( - sorted_begin.cbegin(), - sorted_begin.cend(), - [frag](const Object a) -> bool { - const auto begin_a = frag.registry()->get(a).begin; - const auto begin_frag = frag.get().begin; - if (begin_a > begin_frag) { - return true; - } else if (begin_a < begin_frag) { - return false; - } else { - // equal ts, we need to fall back to id (id can not be equal) - return isLess(frag.get().v, frag.registry()->get(a).v); - } - } - ); - - begin_index = std::distance(sorted_begin.cbegin(), pos); - - // we need to insert before pos (end is valid here) - sorted_begin.insert(pos, frag); - } - - size_t end_index {0}; - { // end - const auto pos = std::find_if_not( - sorted_end.cbegin(), - sorted_end.cend(), - [frag](const Object a) -> bool { - const auto end_a = frag.registry()->get(a).end; - const auto end_frag = frag.get().end; - if (end_a > end_frag) { - return true; - } else if (end_a < end_frag) { - return false; - } else { - // equal ts, we need to fall back to id (id can not be equal) - return isLess(frag.get().v, frag.registry()->get(a).v); - } - } - ); - - end_index = std::distance(sorted_end.cbegin(), pos); - - // we need to insert before pos (end is valid here) - sorted_end.insert(pos, frag); - } - - frags.emplace(frag, InternalEntry{begin_index, end_index}); - - // now adjust all indicies of fragments coming after the insert position - for (size_t i = begin_index + 1; i < sorted_begin.size(); i++) { - frags.at(sorted_begin[i]).i_b = i; - } - for (size_t i = end_index + 1; i < sorted_end.size(); i++) { - frags.at(sorted_end[i]).i_e = i; - } - - return true; -} - -bool Message::Components::ContactFragments::erase(Object frag) { - auto frags_it = frags.find(frag); - if (frags_it == frags.end()) { - return false; - } - - assert(sorted_begin.size() == sorted_end.size()); - assert(sorted_begin.size() > frags_it->second.i_b); - - sorted_begin.erase(sorted_begin.begin() + frags_it->second.i_b); - sorted_end.erase(sorted_end.begin() + frags_it->second.i_e); - - frags.erase(frags_it); - - return true; -} - -Object Message::Components::ContactFragments::prev(Object frag) const { - // uses range begin to go back in time - - auto it = frags.find(frag); - if (it == frags.end()) { - return entt::null; - } - - const auto src_i = it->second.i_b; - if (src_i > 0) { - return sorted_begin[src_i-1]; - } - - return entt::null; -} - -Object Message::Components::ContactFragments::next(Object frag) const { - // uses range end to go forward in time - - auto it = frags.find(frag); - if (it == frags.end()) { - return entt::null; - } - - const auto src_i = it->second.i_e; - if (src_i+1 < sorted_end.size()) { - return sorted_end[src_i+1]; - } - - return entt::null; -} - diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index 1ac4402a..2f13aca7 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -25,6 +25,7 @@ namespace Message::Components { //using FUID = FragComp::ID; struct Obj { + // message fragment's object Object o {entt::null}; };