From 2772c8ee6944b108d86147e4d0e6aa09eb75b185 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 12 Mar 2024 11:58:23 +0100 Subject: [PATCH] reduce excessive message frag saving (queue dedup + waiting 10sec) prepare for frag updates --- src/fragment_store/fragment_store_i.hpp | 4 - src/fragment_store/message_fragment_store.cpp | 96 ++++++++++++++++--- src/fragment_store/message_fragment_store.hpp | 7 +- 3 files changed, 89 insertions(+), 18 deletions(-) diff --git a/src/fragment_store/fragment_store_i.hpp b/src/fragment_store/fragment_store_i.hpp index 8dffdf34..68d96a4d 100644 --- a/src/fragment_store/fragment_store_i.hpp +++ b/src/fragment_store/fragment_store_i.hpp @@ -42,10 +42,6 @@ struct FragmentStoreEventI { virtual bool onEvent(const Fragment::Events::FragmentConstruct&) { return false; } virtual bool onEvent(const Fragment::Events::FragmentUpdated&) { return false; } //virtual bool onEvent(const Fragment::Events::MessageDestory&) { return false; } - - // mm3 - // send text - // send file path }; using FragmentStoreEventProviderI = EventProviderI; diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index f375e609..9b666387 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -76,6 +76,11 @@ namespace Fragment::Components { // does not contain any messges // (recheck on frag update) struct MessagesEmptyTag {}; + + // cache the contact for faster lookups + struct MessagesContactEntity { + Contact3 e {entt::null}; + }; } } // Fragment::Component @@ -246,7 +251,13 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { m.emplace_or_replace(fragment_id); // in this case we know the fragment needs an update - _fuid_save_queue.push({Message::getTimeMS(), fragment_id, m.registry()}); + for (const auto& it : _fuid_save_queue) { + if (it.id == fragment_id) { + // already in queue + return; // done + } + } + _fuid_save_queue.push_back({Message::getTimeMS(), fragment_id, m.registry()}); return; // done } @@ -265,7 +276,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (fid_open.contains(msg_fh)) { // TODO: dedup events // TODO: cooldown per fragsave - _fuid_save_queue.push({Message::getTimeMS(), msg_fh, m.registry()}); + _fuid_save_queue.push_back({Message::getTimeMS(), msg_fh, m.registry()}); return; } @@ -466,6 +477,7 @@ MessageFragmentStore::MessageFragmentStore( _fs._sc.registerDeSerializerJson(); _fs.subscribe(this, FragmentStore_Event::fragment_construct); + _fs.subscribe(this, FragmentStore_Event::fragment_updated); } MessageFragmentStore::~MessageFragmentStore(void) { @@ -474,7 +486,7 @@ MessageFragmentStore::~MessageFragmentStore(void) { auto* reg = _fuid_save_queue.front().reg; assert(reg != nullptr); syncFragToStorage(fh, *reg); - _fuid_save_queue.pop(); // pop unconditionally + _fuid_save_queue.pop_front(); // pop unconditionally } } @@ -541,14 +553,18 @@ static bool isLess(const std::vector& lhs, const std::vector& return lhs.size() < rhs.size(); } -float MessageFragmentStore::tick(float time_delta) { +float MessageFragmentStore::tick(float) { + const auto ts_now = Message::getTimeMS(); // sync dirty fragments here if (!_fuid_save_queue.empty()) { - auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id); - auto* reg = _fuid_save_queue.front().reg; - assert(reg != nullptr); - if (syncFragToStorage(fh, *reg)) { - _fuid_save_queue.pop(); + // wait 10sec before saving + if (_fuid_save_queue.front().ts_since_dirty + 10*1000 <= ts_now) { + auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id); + auto* reg = _fuid_save_queue.front().reg; + assert(reg != nullptr); + if (syncFragToStorage(fh, *reg)) { + _fuid_save_queue.pop_front(); + } } } @@ -562,7 +578,7 @@ float MessageFragmentStore::tick(float time_delta) { std::cout << "MFS: event check\n"; auto fh = _fs.fragmentHandle(_event_check_queue.front().fid); auto c = _event_check_queue.front().c; - _event_check_queue.pop(); + _event_check_queue.pop_front(); if (!static_cast(fh)) { return 0.05f; @@ -805,6 +821,63 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) // unkown contact return false; } + e.e.emplace_or_replace(frag_contact); + } + + // create if not exist + auto* msg_reg = _rmm.get(frag_contact); + if (msg_reg == nullptr) { + // msg reg not created yet + // TODO: this is an erroious path + return false; + } + + if (!msg_reg->ctx().contains()) { + msg_reg->ctx().emplace(); + } + msg_reg->ctx().get().erase(e.e); // TODO: can this happen? update + msg_reg->ctx().get().insert(e.e); + + _event_check_queue.push_back(ECQueueEntry{e.e, frag_contact}); + + return false; +} + +bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) { + if (_fs_ignore_event) { + return false; // skip self + } + + if (!e.e.all_of()) { + return false; // not for us + } + + // since its an update, we might have it associated, or not + // its also possible it was tagged as empty + e.e.remove(); + + Contact3 frag_contact = entt::null; + { // get contact + // probably cached already + if (e.e.all_of()) { + frag_contact = e.e.get().e; + } + + if (!_cr.valid(frag_contact)) { + const auto& frag_contact_id = e.e.get().id; + // TODO: id lookup table, this is very inefficent + for (const auto& [c_it, id_it] : _cr.view().each()) { + if (frag_contact_id == id_it.data) { + frag_contact = c_it; + break; + } + } + if (!_cr.valid(frag_contact)) { + // unkown contact + return false; + } + e.e.emplace_or_replace(frag_contact); + } } // create if not exist @@ -821,7 +894,8 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) msg_reg->ctx().get().erase(e.e); // TODO: check/update/fragment update msg_reg->ctx().get().insert(e.e); - _event_check_queue.push(ECQueueEntry{e.e, frag_contact}); + // TODO: actually load it + //_event_check_queue.push_back(ECQueueEntry{e.e, frag_contact}); return false; } diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index 3ad7628b..ac3813a5 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -13,7 +13,7 @@ #include #include -#include +#include #include #include @@ -95,13 +95,13 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS FragmentID id; Message3Registry* reg{nullptr}; }; - std::queue _fuid_save_queue; + std::deque _fuid_save_queue; struct ECQueueEntry final { FragmentID fid; Contact3 c; }; - std::queue _event_check_queue; + std::deque _event_check_queue; // range changed or fragment loaded. // we only load a limited number of fragments at once, @@ -128,5 +128,6 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS protected: // fs bool onEvent(const Fragment::Events::FragmentConstruct& e) override; + bool onEvent(const Fragment::Events::FragmentUpdated& e) override; };