refactor message contexts

This commit is contained in:
Green Sky 2024-04-13 11:38:13 +02:00
parent a5e67d0ee8
commit 498b4435c7
No known key found for this signature in database
5 changed files with 248 additions and 228 deletions

View File

@ -10,6 +10,8 @@ add_library(message_fragment_store
./fragment_store/message_serializer.cpp ./fragment_store/message_serializer.cpp
./fragment_store/messages_meta_components.hpp ./fragment_store/messages_meta_components.hpp
./fragment_store/messages_meta_components_id.inl ./fragment_store/messages_meta_components_id.inl
./fragment_store/internal_mfs_contexts.hpp
./fragment_store/internal_mfs_contexts.cpp
./fragment_store/message_fragment_store.hpp ./fragment_store/message_fragment_store.hpp
./fragment_store/message_fragment_store.cpp ./fragment_store/message_fragment_store.cpp

View File

@ -0,0 +1,149 @@
#include "./internal_mfs_contexts.hpp"
#include "./message_fragment_store.hpp"
#include <solanaceae/contact/components.hpp>
#include <solanaceae/message3/components.hpp>
#include <solanaceae/message3/contact_components.hpp>
#include <iostream>
static bool isLess(const std::vector<uint8_t>& lhs, const std::vector<uint8_t>& rhs) {
size_t i = 0;
for (; i < lhs.size() && i < rhs.size(); i++) {
if (lhs[i] < rhs[i]) {
return true;
} else if (lhs[i] > rhs[i]) {
return false;
}
// else continue
}
// here we have equality of common lenths
// we define smaller arrays to be less
return lhs.size() < rhs.size();
}
bool Message::Contexts::ContactFragments::insert(ObjectHandle frag) {
if (frags.contains(frag)) {
return false;
}
// both sorted arrays are sorted ascending
// so for insertion we search for the last index that is <= and insert after it
// or we search for the first > (or end) and insert before it <---
// since equal fragments are UB, we can assume they are only > or <
size_t begin_index {0};
{ // begin
const auto pos = std::find_if(
sorted_begin.cbegin(),
sorted_begin.cend(),
[frag](const Object a) -> bool {
const auto begin_a = frag.registry()->get<ObjComp::MessagesTSRange>(a).begin;
const auto begin_frag = frag.get<ObjComp::MessagesTSRange>().begin;
if (begin_a > begin_frag) {
return true;
} else if (begin_a < begin_frag) {
return false;
} else {
// equal ts, we need to fall back to id (id can not be equal)
return isLess(frag.get<ObjComp::ID>().v, frag.registry()->get<ObjComp::ID>(a).v);
}
}
);
begin_index = std::distance(sorted_begin.cbegin(), pos);
// we need to insert before pos (end is valid here)
sorted_begin.insert(pos, frag);
}
size_t end_index {0};
{ // end
const auto pos = std::find_if_not(
sorted_end.cbegin(),
sorted_end.cend(),
[frag](const Object a) -> bool {
const auto end_a = frag.registry()->get<ObjComp::MessagesTSRange>(a).end;
const auto end_frag = frag.get<ObjComp::MessagesTSRange>().end;
if (end_a > end_frag) {
return true;
} else if (end_a < end_frag) {
return false;
} else {
// equal ts, we need to fall back to id (id can not be equal)
return isLess(frag.get<ObjComp::ID>().v, frag.registry()->get<ObjComp::ID>(a).v);
}
}
);
end_index = std::distance(sorted_end.cbegin(), pos);
// we need to insert before pos (end is valid here)
sorted_end.insert(pos, frag);
}
frags.emplace(frag, InternalEntry{begin_index, end_index});
// now adjust all indicies of fragments coming after the insert position
for (size_t i = begin_index + 1; i < sorted_begin.size(); i++) {
frags.at(sorted_begin[i]).i_b = i;
}
for (size_t i = end_index + 1; i < sorted_end.size(); i++) {
frags.at(sorted_end[i]).i_e = i;
}
return true;
}
bool Message::Contexts::ContactFragments::erase(Object frag) {
auto frags_it = frags.find(frag);
if (frags_it == frags.end()) {
return false;
}
assert(sorted_begin.size() == sorted_end.size());
assert(sorted_begin.size() > frags_it->second.i_b);
sorted_begin.erase(sorted_begin.begin() + frags_it->second.i_b);
sorted_end.erase(sorted_end.begin() + frags_it->second.i_e);
frags.erase(frags_it);
return true;
}
Object Message::Contexts::ContactFragments::prev(Object frag) const {
// uses range begin to go back in time
auto it = frags.find(frag);
if (it == frags.end()) {
return entt::null;
}
const auto src_i = it->second.i_b;
if (src_i > 0) {
return sorted_begin[src_i-1];
}
return entt::null;
}
Object Message::Contexts::ContactFragments::next(Object frag) const {
// uses range end to go forward in time
auto it = frags.find(frag);
if (it == frags.end()) {
return entt::null;
}
const auto src_i = it->second.i_e;
if (src_i+1 < sorted_end.size()) {
return sorted_end[src_i+1];
}
return entt::null;
}

View File

@ -0,0 +1,53 @@
#pragma once
#include <solanaceae/object_store/object_store.hpp>
#include <entt/container/dense_set.hpp>
#include <entt/container/dense_map.hpp>
// everything assumes a single object registry (and unique objects)
namespace Message::Contexts {
// ctx
struct OpenFragments {
// only contains fragments with <1024 messages and <2h tsrage (or whatever)
entt::dense_set<Object> open_frags;
};
// all message fragments of this contact
struct ContactFragments final {
// kept up-to-date by events
struct InternalEntry {
// indecies into the sorted arrays
size_t i_b;
size_t i_e;
};
entt::dense_map<Object, InternalEntry> frags;
// add 2 sorted contact lists for both range begin and end
// TODO: adding and removing becomes expensive with enough frags, consider splitting or heap
std::vector<Object> sorted_begin;
std::vector<Object> sorted_end;
// api
// return true if it was actually inserted
bool insert(ObjectHandle frag);
bool erase(Object frag);
// update? (just erase() + insert())
// uses range begin to go back in time
Object prev(Object frag) const;
// uses range end to go forward in time
Object next(Object frag) const;
};
// all LOADED message fragments
// TODO: merge into ContactFragments (and pull in openfrags)
struct LoadedContactFragments final {
// kept up-to-date by events
entt::dense_set<Object> loaded_frags;
};
} // Message::Contexts

View File

@ -1,5 +1,7 @@
#include "./message_fragment_store.hpp" #include "./message_fragment_store.hpp"
#include "./internal_mfs_contexts.hpp"
#include <solanaceae/object_store/serializer_json.hpp> #include <solanaceae/object_store/serializer_json.hpp>
#include "../json/message_components.hpp" #include "../json/message_components.hpp"
@ -20,55 +22,7 @@
// https://youtu.be/CU2exyhYPfA // https://youtu.be/CU2exyhYPfA
// everything assumes a single fragment registry // everything assumes a single object registry (and unique objects)
namespace Message::Components {
// ctx
struct OpenFragments {
//struct OpenFrag final {
////std::vector<uint8_t> uid;
//FragmentID id;
//};
// only contains fragments with <1024 messages and <2h tsrage (or whatever)
entt::dense_set<Object> fid_open;
};
// all message fragments of this contact
struct ContactFragments final {
// kept up-to-date by events
struct InternalEntry {
// indecies into the sorted arrays
size_t i_b;
size_t i_e;
};
entt::dense_map<Object, InternalEntry> frags;
// add 2 sorted contact lists for both range begin and end
// TODO: adding and removing becomes expensive with enough frags, consider splitting or heap
std::vector<Object> sorted_begin;
std::vector<Object> sorted_end;
// api
// return true if it was actually inserted
bool insert(ObjectHandle frag);
bool erase(Object frag);
// update? (just erase() + insert())
// uses range begin to go back in time
Object prev(Object frag) const;
// uses range end to go forward in time
Object next(Object frag) const;
};
// all LOADED message fragments
// TODO: merge into ContactFragments (and pull in openfrags)
struct LoadedContactFragments final {
// kept up-to-date by events
entt::dense_set<Object> frags;
};
} // Message::Components
namespace ObjectStore::Components { namespace ObjectStore::Components {
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesVersion, v) NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesVersion, v)
@ -135,11 +89,11 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// TODO: use fid, seving full fuid for every message consumes alot of memory (and heap frag) // TODO: use fid, seving full fuid for every message consumes alot of memory (and heap frag)
if (!m.all_of<Message::Components::Obj>()) { if (!m.all_of<Message::Components::Obj>()) {
std::cout << "MFS: new msg missing Object\n"; std::cout << "MFS: new msg missing Object\n";
if (!m.registry()->ctx().contains<Message::Components::OpenFragments>()) { if (!m.registry()->ctx().contains<Message::Contexts::OpenFragments>()) {
m.registry()->ctx().emplace<Message::Components::OpenFragments>(); m.registry()->ctx().emplace<Message::Contexts::OpenFragments>();
} }
auto& fid_open = m.registry()->ctx().get<Message::Components::OpenFragments>().fid_open; auto& fid_open = m.registry()->ctx().get<Message::Contexts::OpenFragments>().open_frags;
const auto msg_ts = m.get<Message::Components::Timestamp>().ts; const auto msg_ts = m.get<Message::Components::Timestamp>().ts;
// missing fuid // missing fuid
@ -185,10 +139,10 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// assuming ts range exists // assuming ts range exists
fts_comp.begin = msg_ts; // extend into the past fts_comp.begin = msg_ts; // extend into the past
if (m.registry()->ctx().contains<Message::Components::ContactFragments>()) { if (m.registry()->ctx().contains<Message::Contexts::ContactFragments>()) {
// should be the case // should be the case
m.registry()->ctx().get<Message::Components::ContactFragments>().erase(fh); m.registry()->ctx().get<Message::Contexts::ContactFragments>().erase(fh);
m.registry()->ctx().get<Message::Components::ContactFragments>().insert(fh); m.registry()->ctx().get<Message::Contexts::ContactFragments>().insert(fh);
} }
@ -202,10 +156,10 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// assuming ts range exists // assuming ts range exists
fts_comp.end = msg_ts; // extend into the future fts_comp.end = msg_ts; // extend into the future
if (m.registry()->ctx().contains<Message::Components::ContactFragments>()) { if (m.registry()->ctx().contains<Message::Contexts::ContactFragments>()) {
// should be the case // should be the case
m.registry()->ctx().get<Message::Components::ContactFragments>().erase(fh); m.registry()->ctx().get<Message::Contexts::ContactFragments>().erase(fh);
m.registry()->ctx().get<Message::Components::ContactFragments>().insert(fh); m.registry()->ctx().get<Message::Contexts::ContactFragments>().insert(fh);
} }
// TODO: check conditions for open here // TODO: check conditions for open here
@ -245,16 +199,16 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
} }
// contact frag // contact frag
if (!m.registry()->ctx().contains<Message::Components::ContactFragments>()) { if (!m.registry()->ctx().contains<Message::Contexts::ContactFragments>()) {
m.registry()->ctx().emplace<Message::Components::ContactFragments>(); m.registry()->ctx().emplace<Message::Contexts::ContactFragments>();
} }
m.registry()->ctx().get<Message::Components::ContactFragments>().insert(fh); m.registry()->ctx().get<Message::Contexts::ContactFragments>().insert(fh);
// loaded contact frag // loaded contact frag
if (!m.registry()->ctx().contains<Message::Components::LoadedContactFragments>()) { if (!m.registry()->ctx().contains<Message::Contexts::LoadedContactFragments>()) {
m.registry()->ctx().emplace<Message::Components::LoadedContactFragments>(); m.registry()->ctx().emplace<Message::Contexts::LoadedContactFragments>();
} }
m.registry()->ctx().get<Message::Components::LoadedContactFragments>().frags.emplace(fh); m.registry()->ctx().get<Message::Contexts::LoadedContactFragments>().loaded_frags.emplace(fh);
fid_open.emplace(fragment_id); fid_open.emplace(fragment_id);
@ -290,11 +244,11 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
return; // TODO: properly handle this case return; // TODO: properly handle this case
} }
if (!m.registry()->ctx().contains<Message::Components::OpenFragments>()) { if (!m.registry()->ctx().contains<Message::Contexts::OpenFragments>()) {
m.registry()->ctx().emplace<Message::Components::OpenFragments>(); m.registry()->ctx().emplace<Message::Contexts::OpenFragments>();
} }
auto& fid_open = m.registry()->ctx().get<Message::Components::OpenFragments>().fid_open; auto& fid_open = m.registry()->ctx().get<Message::Contexts::OpenFragments>().open_frags;
if (fid_open.contains(msg_fh)) { if (fid_open.contains(msg_fh)) {
// TODO: dedup events // TODO: dedup events
@ -330,16 +284,16 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, ObjectHandle fh)
} }
// TODO: this should probably never be the case, since we already know here that it is a msg frag // TODO: this should probably never be the case, since we already know here that it is a msg frag
if (!reg.ctx().contains<Message::Components::ContactFragments>()) { if (!reg.ctx().contains<Message::Contexts::ContactFragments>()) {
reg.ctx().emplace<Message::Components::ContactFragments>(); reg.ctx().emplace<Message::Contexts::ContactFragments>();
} }
reg.ctx().get<Message::Components::ContactFragments>().insert(fh); reg.ctx().get<Message::Contexts::ContactFragments>().insert(fh);
// mark loaded // mark loaded
if (!reg.ctx().contains<Message::Components::LoadedContactFragments>()) { if (!reg.ctx().contains<Message::Contexts::LoadedContactFragments>()) {
reg.ctx().emplace<Message::Components::LoadedContactFragments>(); reg.ctx().emplace<Message::Contexts::LoadedContactFragments>();
} }
reg.ctx().get<Message::Components::LoadedContactFragments>().frags.emplace(fh); reg.ctx().get<Message::Contexts::LoadedContactFragments>().loaded_frags.emplace(fh);
size_t messages_new_or_updated {0}; size_t messages_new_or_updated {0};
for (const auto& j_entry : j) { for (const auto& j_entry : j) {
@ -581,23 +535,6 @@ static bool rangeVisible(uint64_t range_begin, uint64_t range_end, const Message
return false; return false;
} }
static bool isLess(const std::vector<uint8_t>& lhs, const std::vector<uint8_t>& rhs) {
size_t i = 0;
for (; i < lhs.size() && i < rhs.size(); i++) {
if (lhs[i] < rhs[i]) {
return true;
} else if (lhs[i] > rhs[i]) {
return false;
}
// else continue
}
// here we have equality of common lenths
// we define smaller arrays to be less
return lhs.size() < rhs.size();
}
float MessageFragmentStore::tick(float) { float MessageFragmentStore::tick(float) {
const auto ts_now = Message::getTimeMS(); const auto ts_now = Message::getTimeMS();
// sync dirty fragments here // sync dirty fragments here
@ -673,15 +610,15 @@ float MessageFragmentStore::tick(float) {
// first do collision check agains every contact associated fragment // first do collision check agains every contact associated fragment
// that is not already loaded !! // that is not already loaded !!
if (msg_reg->ctx().contains<Message::Components::ContactFragments>()) { if (msg_reg->ctx().contains<Message::Contexts::ContactFragments>()) {
const auto& cf = msg_reg->ctx().get<Message::Components::ContactFragments>(); const auto& cf = msg_reg->ctx().get<Message::Contexts::ContactFragments>();
if (!cf.frags.empty()) { if (!cf.frags.empty()) {
if (!msg_reg->ctx().contains<Message::Components::LoadedContactFragments>()) { if (!msg_reg->ctx().contains<Message::Contexts::LoadedContactFragments>()) {
msg_reg->ctx().emplace<Message::Components::LoadedContactFragments>(); msg_reg->ctx().emplace<Message::Contexts::LoadedContactFragments>();
} }
const auto& loaded_frags = msg_reg->ctx().get<Message::Components::LoadedContactFragments>().frags; const auto& loaded_frags = msg_reg->ctx().get<Message::Contexts::LoadedContactFragments>().loaded_frags;
for (const auto& [fid, si] : msg_reg->ctx().get<Message::Components::ContactFragments>().frags) { for (const auto& [fid, si] : msg_reg->ctx().get<Message::Contexts::ContactFragments>().frags) {
if (loaded_frags.contains(fid)) { if (loaded_frags.contains(fid)) {
continue; continue;
} }
@ -691,14 +628,14 @@ float MessageFragmentStore::tick(float) {
if (!static_cast<bool>(fh)) { if (!static_cast<bool>(fh)) {
std::cerr << "MFS error: frag is invalid\n"; std::cerr << "MFS error: frag is invalid\n";
// WHAT // WHAT
msg_reg->ctx().get<Message::Components::ContactFragments>().erase(fid); msg_reg->ctx().get<Message::Contexts::ContactFragments>().erase(fid);
return 0.05f; return 0.05f;
} }
if (!fh.all_of<ObjComp::MessagesTSRange>()) { if (!fh.all_of<ObjComp::MessagesTSRange>()) {
std::cerr << "MFS error: frag has no range\n"; std::cerr << "MFS error: frag has no range\n";
// ???? // ????
msg_reg->ctx().get<Message::Components::ContactFragments>().erase(fid); msg_reg->ctx().get<Message::Contexts::ContactFragments>().erase(fid);
return 0.05f; return 0.05f;
} }
@ -887,11 +824,11 @@ bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectConstruct& e
return false; return false;
} }
if (!msg_reg->ctx().contains<Message::Components::ContactFragments>()) { if (!msg_reg->ctx().contains<Message::Contexts::ContactFragments>()) {
msg_reg->ctx().emplace<Message::Components::ContactFragments>(); msg_reg->ctx().emplace<Message::Contexts::ContactFragments>();
} }
msg_reg->ctx().get<Message::Components::ContactFragments>().erase(e.e); // TODO: can this happen? update msg_reg->ctx().get<Message::Contexts::ContactFragments>().erase(e.e); // TODO: can this happen? update
msg_reg->ctx().get<Message::Components::ContactFragments>().insert(e.e); msg_reg->ctx().get<Message::Contexts::ContactFragments>().insert(e.e);
_event_check_queue.push_back(ECQueueEntry{e.e, frag_contact}); _event_check_queue.push_back(ECQueueEntry{e.e, frag_contact});
@ -943,11 +880,11 @@ bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
return false; return false;
} }
if (!msg_reg->ctx().contains<Message::Components::ContactFragments>()) { if (!msg_reg->ctx().contains<Message::Contexts::ContactFragments>()) {
msg_reg->ctx().emplace<Message::Components::ContactFragments>(); msg_reg->ctx().emplace<Message::Contexts::ContactFragments>();
} }
msg_reg->ctx().get<Message::Components::ContactFragments>().erase(e.e); // TODO: check/update/fragment update msg_reg->ctx().get<Message::Contexts::ContactFragments>().erase(e.e); // TODO: check/update/fragment update
msg_reg->ctx().get<Message::Components::ContactFragments>().insert(e.e); msg_reg->ctx().get<Message::Contexts::ContactFragments>().insert(e.e);
// TODO: actually load it // TODO: actually load it
//_event_check_queue.push_back(ECQueueEntry{e.e, frag_contact}); //_event_check_queue.push_back(ECQueueEntry{e.e, frag_contact});
@ -955,125 +892,3 @@ bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
return false; return false;
} }
bool Message::Components::ContactFragments::insert(ObjectHandle frag) {
if (frags.contains(frag)) {
return false;
}
// both sorted arrays are sorted ascending
// so for insertion we search for the last index that is <= and insert after it
// or we search for the first > (or end) and insert before it <---
// since equal fragments are UB, we can assume they are only > or <
size_t begin_index {0};
{ // begin
const auto pos = std::find_if(
sorted_begin.cbegin(),
sorted_begin.cend(),
[frag](const Object a) -> bool {
const auto begin_a = frag.registry()->get<ObjComp::MessagesTSRange>(a).begin;
const auto begin_frag = frag.get<ObjComp::MessagesTSRange>().begin;
if (begin_a > begin_frag) {
return true;
} else if (begin_a < begin_frag) {
return false;
} else {
// equal ts, we need to fall back to id (id can not be equal)
return isLess(frag.get<ObjComp::ID>().v, frag.registry()->get<ObjComp::ID>(a).v);
}
}
);
begin_index = std::distance(sorted_begin.cbegin(), pos);
// we need to insert before pos (end is valid here)
sorted_begin.insert(pos, frag);
}
size_t end_index {0};
{ // end
const auto pos = std::find_if_not(
sorted_end.cbegin(),
sorted_end.cend(),
[frag](const Object a) -> bool {
const auto end_a = frag.registry()->get<ObjComp::MessagesTSRange>(a).end;
const auto end_frag = frag.get<ObjComp::MessagesTSRange>().end;
if (end_a > end_frag) {
return true;
} else if (end_a < end_frag) {
return false;
} else {
// equal ts, we need to fall back to id (id can not be equal)
return isLess(frag.get<ObjComp::ID>().v, frag.registry()->get<ObjComp::ID>(a).v);
}
}
);
end_index = std::distance(sorted_end.cbegin(), pos);
// we need to insert before pos (end is valid here)
sorted_end.insert(pos, frag);
}
frags.emplace(frag, InternalEntry{begin_index, end_index});
// now adjust all indicies of fragments coming after the insert position
for (size_t i = begin_index + 1; i < sorted_begin.size(); i++) {
frags.at(sorted_begin[i]).i_b = i;
}
for (size_t i = end_index + 1; i < sorted_end.size(); i++) {
frags.at(sorted_end[i]).i_e = i;
}
return true;
}
bool Message::Components::ContactFragments::erase(Object frag) {
auto frags_it = frags.find(frag);
if (frags_it == frags.end()) {
return false;
}
assert(sorted_begin.size() == sorted_end.size());
assert(sorted_begin.size() > frags_it->second.i_b);
sorted_begin.erase(sorted_begin.begin() + frags_it->second.i_b);
sorted_end.erase(sorted_end.begin() + frags_it->second.i_e);
frags.erase(frags_it);
return true;
}
Object Message::Components::ContactFragments::prev(Object frag) const {
// uses range begin to go back in time
auto it = frags.find(frag);
if (it == frags.end()) {
return entt::null;
}
const auto src_i = it->second.i_b;
if (src_i > 0) {
return sorted_begin[src_i-1];
}
return entt::null;
}
Object Message::Components::ContactFragments::next(Object frag) const {
// uses range end to go forward in time
auto it = frags.find(frag);
if (it == frags.end()) {
return entt::null;
}
const auto src_i = it->second.i_e;
if (src_i+1 < sorted_end.size()) {
return sorted_end[src_i+1];
}
return entt::null;
}

View File

@ -25,6 +25,7 @@ namespace Message::Components {
//using FUID = FragComp::ID; //using FUID = FragComp::ID;
struct Obj { struct Obj {
// message fragment's object
Object o {entt::null}; Object o {entt::null};
}; };