diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index 11124ed..f026780 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -10,6 +10,7 @@ #include +#include #include #include #include @@ -17,6 +18,8 @@ // https://youtu.be/CU2exyhYPfA +// everything assumes a single fragment registry + namespace Message::Components { // ctx @@ -32,9 +35,23 @@ namespace Message::Components { // all message fragments of this contact struct ContactFragments final { // kept up-to-date by events - entt::dense_set frags; + //entt::dense_set frags; + struct InternalEntry { + // indecies into the sorted arrays + size_t i_b; + size_t i_e; + }; + entt::dense_map frags; // add 2 sorted contact lists for both range begin and end + std::vector sorted_begin; + std::vector sorted_end; + + // api + // return true if it was actually inserted + bool insert(FragmentHandle frag); + bool erase(FragmentID frag); + // update? (just erase() + insert()) }; // all LOADED message fragments @@ -78,12 +95,13 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (!m.registry()->ctx().contains()) { m.registry()->ctx().emplace(); +#if 0 // TODO: move this to async // TODO: move this to tick and just respect the dirty FragmentHandle most_recent_fag; uint64_t most_recent_ts{0}; if (m.registry()->ctx().contains()) { - for (const auto fid : m.registry()->ctx().get().frags) { + for (const auto& [fid, si] : m.registry()->ctx().get().frags) { auto fh = _fs.fragmentHandle(fid); if (!static_cast(fh) || !fh.all_of()) { // TODO: remove at this point? @@ -92,6 +110,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { const uint64_t f_ts = fh.get().begin; if (f_ts > most_recent_ts) { + // this makes no sense, we retry to load the first fragment on every new message and bail here, bc it was already if (m.registry()->ctx().contains()) { if (m.registry()->ctx().get().frags.contains(fh)) { continue; // already loaded @@ -106,6 +125,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (static_cast(most_recent_fag)) { loadFragment(*m.registry(), most_recent_fag); } +#endif } auto& fuid_open = m.registry()->ctx().get().fuid_open; @@ -193,7 +213,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (!m.registry()->ctx().contains()) { m.registry()->ctx().emplace(); } - m.registry()->ctx().get().frags.emplace(fh); + m.registry()->ctx().get().insert(fh); // loaded contact frag if (!m.registry()->ctx().contains()) { @@ -252,7 +272,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh if (!reg.ctx().contains()) { reg.ctx().emplace(); } - reg.ctx().get().frags.emplace(fh); + reg.ctx().get().insert(fh); // mark loaded if (!reg.ctx().contains()) { @@ -535,7 +555,7 @@ float MessageFragmentStore::tick(float time_delta) { } const auto& loaded_frags = msg_reg->ctx().get().frags; - for (const FragmentID fid : msg_reg->ctx().get().frags) { + for (const auto& [fid, si] : msg_reg->ctx().get().frags) { if (loaded_frags.contains(fid)) { continue; } @@ -545,14 +565,14 @@ float MessageFragmentStore::tick(float time_delta) { if (!static_cast(fh)) { std::cerr << "MFS error: frag is invalid\n"; // WHAT - msg_reg->ctx().get().frags.erase(fid); + msg_reg->ctx().get().erase(fid); return 0.05f; } if (!fh.all_of()) { std::cerr << "MFS error: frag has no range\n"; // ???? - msg_reg->ctx().get().frags.erase(fid); + msg_reg->ctx().get().erase(fid); return 0.05f; } @@ -585,7 +605,7 @@ float MessageFragmentStore::tick(float time_delta) { uint64_t frag_oldest_ts {}; // find newest frag in range - for (const FragmentID fid : msg_reg->ctx().get().frags) { + for (const auto& [fid, si] : msg_reg->ctx().get().frags) { // we want to find the last and first fragment of the range (if not all hits are loaded, we did something wrong) if (!loaded_frags.contains(fid)) { continue; @@ -867,10 +887,92 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) if (!msg_reg->ctx().contains()) { msg_reg->ctx().emplace(); } - msg_reg->ctx().get().frags.emplace(e.e); + msg_reg->ctx().get().insert(e.e); _event_check_queue.push(ECQueueEntry{e.e, frag_contact}); return false; } +bool Message::Components::ContactFragments::insert(FragmentHandle frag) { + if (frags.contains(frag)) { + return false; + } + + // both sorted arrays are sorted ascending + // so for insertion we search for the last index that is <= and insert after it + // or we search for the first > (or end) and insert before it <--- + // since equal fragments are UB, we can assume they are only > or < + + size_t begin_index {0}; + { // begin + const auto pos = std::find_if( + sorted_begin.cbegin(), + sorted_begin.cend(), + [frag](const FragmentID a) -> bool { + const auto begin_a = frag.registry()->get(a).begin; + const auto begin_frag = frag.get().begin; + if (begin_a > begin_frag) { + return true; + } else if (begin_a < begin_frag) { + return false; + } else { + // equal ts, we need to fall back to id (id can not be equal) + return isLess(frag.get().v, frag.registry()->get(a).v); + } + } + ); + + begin_index = std::distance(sorted_begin.cbegin(), pos); + + // we need to insert before pos (end is valid here) + sorted_begin.insert(pos, frag); + } + + size_t end_index {0}; + { // end + const auto pos = std::find_if_not( + sorted_end.cbegin(), + sorted_end.cend(), + [frag](const FragmentID a) -> bool { + const auto end_a = frag.registry()->get(a).end; + const auto end_frag = frag.get().end; + if (end_a > end_frag) { + return true; + } else if (end_a < end_frag) { + return false; + } else { + // equal ts, we need to fall back to id (id can not be equal) + return isLess(frag.get().v, frag.registry()->get(a).v); + } + } + ); + + end_index = std::distance(sorted_end.cbegin(), pos); + + // we need to insert before pos (end is valid here) + sorted_end.insert(pos, frag); + } + + frags.emplace(frag, InternalEntry{begin_index, end_index}); + + return true; +} + +bool Message::Components::ContactFragments::erase(FragmentID frag) { + auto frags_it = frags.find(frag); + if (frags_it == frags.end()) { + return false; + } + + assert(sorted_begin.size() == sorted_end.size()); + assert(sorted_begin.size() > frags_it->second.i_b); + + sorted_begin.erase(sorted_begin.begin() + frags_it->second.i_b); + sorted_end.erase(sorted_end.begin() + frags_it->second.i_e); + + frags.erase(frags_it); + + return true; +} +