reduce excessive message frag saving (queue dedup + waiting 10sec)

prepare for frag updates
This commit is contained in:
Green Sky 2024-03-12 11:58:23 +01:00
parent eac2927379
commit 2772c8ee69
No known key found for this signature in database
3 changed files with 89 additions and 18 deletions

View File

@ -42,10 +42,6 @@ struct FragmentStoreEventI {
virtual bool onEvent(const Fragment::Events::FragmentConstruct&) { return false; }
virtual bool onEvent(const Fragment::Events::FragmentUpdated&) { return false; }
//virtual bool onEvent(const Fragment::Events::MessageDestory&) { return false; }
// mm3
// send text
// send file path
};
using FragmentStoreEventProviderI = EventProviderI<FragmentStoreEventI>;

View File

@ -76,6 +76,11 @@ namespace Fragment::Components {
// does not contain any messges
// (recheck on frag update)
struct MessagesEmptyTag {};
// cache the contact for faster lookups
struct MessagesContactEntity {
Contact3 e {entt::null};
};
}
} // Fragment::Component
@ -246,7 +251,13 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
m.emplace_or_replace<Message::Components::FID>(fragment_id);
// in this case we know the fragment needs an update
_fuid_save_queue.push({Message::getTimeMS(), fragment_id, m.registry()});
for (const auto& it : _fuid_save_queue) {
if (it.id == fragment_id) {
// already in queue
return; // done
}
}
_fuid_save_queue.push_back({Message::getTimeMS(), fragment_id, m.registry()});
return; // done
}
@ -265,7 +276,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
if (fid_open.contains(msg_fh)) {
// TODO: dedup events
// TODO: cooldown per fragsave
_fuid_save_queue.push({Message::getTimeMS(), msg_fh, m.registry()});
_fuid_save_queue.push_back({Message::getTimeMS(), msg_fh, m.registry()});
return;
}
@ -466,6 +477,7 @@ MessageFragmentStore::MessageFragmentStore(
_fs._sc.registerDeSerializerJson<FragComp::MessagesContact>();
_fs.subscribe(this, FragmentStore_Event::fragment_construct);
_fs.subscribe(this, FragmentStore_Event::fragment_updated);
}
MessageFragmentStore::~MessageFragmentStore(void) {
@ -474,7 +486,7 @@ MessageFragmentStore::~MessageFragmentStore(void) {
auto* reg = _fuid_save_queue.front().reg;
assert(reg != nullptr);
syncFragToStorage(fh, *reg);
_fuid_save_queue.pop(); // pop unconditionally
_fuid_save_queue.pop_front(); // pop unconditionally
}
}
@ -541,14 +553,18 @@ static bool isLess(const std::vector<uint8_t>& lhs, const std::vector<uint8_t>&
return lhs.size() < rhs.size();
}
float MessageFragmentStore::tick(float time_delta) {
float MessageFragmentStore::tick(float) {
const auto ts_now = Message::getTimeMS();
// sync dirty fragments here
if (!_fuid_save_queue.empty()) {
// wait 10sec before saving
if (_fuid_save_queue.front().ts_since_dirty + 10*1000 <= ts_now) {
auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id);
auto* reg = _fuid_save_queue.front().reg;
assert(reg != nullptr);
if (syncFragToStorage(fh, *reg)) {
_fuid_save_queue.pop();
_fuid_save_queue.pop_front();
}
}
}
@ -562,7 +578,7 @@ float MessageFragmentStore::tick(float time_delta) {
std::cout << "MFS: event check\n";
auto fh = _fs.fragmentHandle(_event_check_queue.front().fid);
auto c = _event_check_queue.front().c;
_event_check_queue.pop();
_event_check_queue.pop_front();
if (!static_cast<bool>(fh)) {
return 0.05f;
@ -805,6 +821,63 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e)
// unkown contact
return false;
}
e.e.emplace_or_replace<FragComp::Ephemeral::MessagesContactEntity>(frag_contact);
}
// create if not exist
auto* msg_reg = _rmm.get(frag_contact);
if (msg_reg == nullptr) {
// msg reg not created yet
// TODO: this is an erroious path
return false;
}
if (!msg_reg->ctx().contains<Message::Components::ContactFragments>()) {
msg_reg->ctx().emplace<Message::Components::ContactFragments>();
}
msg_reg->ctx().get<Message::Components::ContactFragments>().erase(e.e); // TODO: can this happen? update
msg_reg->ctx().get<Message::Components::ContactFragments>().insert(e.e);
_event_check_queue.push_back(ECQueueEntry{e.e, frag_contact});
return false;
}
bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) {
if (_fs_ignore_event) {
return false; // skip self
}
if (!e.e.all_of<FragComp::MessagesTSRange, FragComp::MessagesContact>()) {
return false; // not for us
}
// since its an update, we might have it associated, or not
// its also possible it was tagged as empty
e.e.remove<FragComp::Ephemeral::MessagesEmptyTag>();
Contact3 frag_contact = entt::null;
{ // get contact
// probably cached already
if (e.e.all_of<FragComp::Ephemeral::MessagesContactEntity>()) {
frag_contact = e.e.get<FragComp::Ephemeral::MessagesContactEntity>().e;
}
if (!_cr.valid(frag_contact)) {
const auto& frag_contact_id = e.e.get<FragComp::MessagesContact>().id;
// TODO: id lookup table, this is very inefficent
for (const auto& [c_it, id_it] : _cr.view<Contact::Components::ID>().each()) {
if (frag_contact_id == id_it.data) {
frag_contact = c_it;
break;
}
}
if (!_cr.valid(frag_contact)) {
// unkown contact
return false;
}
e.e.emplace_or_replace<FragComp::Ephemeral::MessagesContactEntity>(frag_contact);
}
}
// create if not exist
@ -821,7 +894,8 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e)
msg_reg->ctx().get<Message::Components::ContactFragments>().erase(e.e); // TODO: check/update/fragment update
msg_reg->ctx().get<Message::Components::ContactFragments>().insert(e.e);
_event_check_queue.push(ECQueueEntry{e.e, frag_contact});
// TODO: actually load it
//_event_check_queue.push_back(ECQueueEntry{e.e, frag_contact});
return false;
}

View File

@ -13,7 +13,7 @@
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <queue>
#include <deque>
#include <vector>
#include <cstdint>
@ -95,13 +95,13 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS
FragmentID id;
Message3Registry* reg{nullptr};
};
std::queue<SaveQueueEntry> _fuid_save_queue;
std::deque<SaveQueueEntry> _fuid_save_queue;
struct ECQueueEntry final {
FragmentID fid;
Contact3 c;
};
std::queue<ECQueueEntry> _event_check_queue;
std::deque<ECQueueEntry> _event_check_queue;
// range changed or fragment loaded.
// we only load a limited number of fragments at once,
@ -128,5 +128,6 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS
protected: // fs
bool onEvent(const Fragment::Events::FragmentConstruct& e) override;
bool onEvent(const Fragment::Events::FragmentUpdated& e) override;
};