load based on view cursers (untested and not used yet)

This commit is contained in:
Green Sky 2024-02-22 23:07:59 +01:00
parent e442191aad
commit 22f2c8f514
No known key found for this signature in database
3 changed files with 225 additions and 18 deletions

View File

@ -684,7 +684,7 @@ size_t FragmentStore::scanStoragePath(std::string_view path) {
} }
} }
size_t count {0}; std::vector<FragmentID> scanned_frags;
// step 3: parse meta and insert into reg of non preexising // step 3: parse meta and insert into reg of non preexising
// main thread // main thread
// TODO: check timestamps of preexisting and reload? mark external/remote dirty? // TODO: check timestamps of preexisting and reload? mark external/remote dirty?
@ -749,8 +749,8 @@ size_t FragmentStore::scanStoragePath(std::string_view path) {
meta_data_decomp.resize(ZSTD_DStreamOutSize()); meta_data_decomp.resize(ZSTD_DStreamOutSize());
ZSTD_DCtx* const dctx = ZSTD_createDCtx(); ZSTD_DCtx* const dctx = ZSTD_createDCtx();
ZSTD_inBuffer input {meta_data_ref.data(), meta_data_ref.size(), 0 }; ZSTD_inBuffer input {meta_data_ref.data(), meta_data_ref.size(), 0};
ZSTD_outBuffer output = { meta_data_decomp.data(), meta_data_decomp.size(), 0 }; ZSTD_outBuffer output = {meta_data_decomp.data(), meta_data_decomp.size(), 0};
do { do {
size_t const ret = ZSTD_decompressStream(dctx, &output , &input); size_t const ret = ZSTD_decompressStream(dctx, &output , &input);
if (ZSTD_isError(ret)) { if (ZSTD_isError(ret)) {
@ -809,12 +809,17 @@ size_t FragmentStore::scanStoragePath(std::string_view path) {
std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n"; std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n";
} }
} }
// throw new frag event here scanned_frags.push_back(fh);
throwEventConstruct(fh);
count++;
} }
return count; // TODO: mutex and move code to async and return this list ?
// throw new frag event here, after loading them all
for (const FragmentID fid : scanned_frags) {
throwEventConstruct(fid);
}
return scanned_frags.size();
} }
void FragmentStore::scanStoragePathAsync(std::string path) { void FragmentStore::scanStoragePathAsync(std::string path) {

View File

@ -29,6 +29,18 @@ namespace Message::Components {
std::vector<OpenFrag> fuid_open; std::vector<OpenFrag> fuid_open;
}; };
// all message fragments of this contact
struct ContactFragments final {
// kept up-to-date by events
entt::dense_set<FragmentID> frags;
};
// all LOADED message fragments
struct LoadedContactFragments final {
// kept up-to-date by events
entt::dense_set<FragmentID> frags;
};
} // Message::Components } // Message::Components
namespace Fragment::Components { namespace Fragment::Components {
@ -75,6 +87,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// registry is the same as the one the message event is for // registry is the same as the one the message event is for
if (static_cast<const RegistryMessageModel&>(_rmm).get(frag_contact) == m.registry()) { if (static_cast<const RegistryMessageModel&>(_rmm).get(frag_contact) == m.registry()) {
// TODO: make dirty instead (they already are)
loadFragment(*m.registry(), FragmentHandle{_fs._reg, fid}); loadFragment(*m.registry(), FragmentHandle{_fs._reg, fid});
} }
} }
@ -163,6 +176,18 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
new_ts_range.begin = msg_ts; new_ts_range.begin = msg_ts;
new_ts_range.end = msg_ts; new_ts_range.end = msg_ts;
// contact frag
if (!m.registry()->ctx().contains<Message::Components::ContactFragments>()) {
m.registry()->ctx().emplace<Message::Components::ContactFragments>();
}
m.registry()->ctx().get<Message::Components::ContactFragments>().frags.emplace(fh);
// loaded contact frag
if (!m.registry()->ctx().contains<Message::Components::LoadedContactFragments>()) {
m.registry()->ctx().emplace<Message::Components::LoadedContactFragments>();
}
m.registry()->ctx().get<Message::Components::LoadedContactFragments>().frags.emplace(fh);
{ {
const auto msg_reg_contact = m.registry()->ctx().get<Contact3>(); const auto msg_reg_contact = m.registry()->ctx().get<Contact3>();
if (_cr.all_of<Contact::Components::ID>(msg_reg_contact)) { if (_cr.all_of<Contact::Components::ID>(msg_reg_contact)) {
@ -199,7 +224,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// on new and update: mark as fragment dirty // on new and update: mark as fragment dirty
} }
// assumes new frag // assumes not loaded frag
// need update from frag // need update from frag
void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh) { void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh) {
std::cout << "MFS: loadFragment\n"; std::cout << "MFS: loadFragment\n";
@ -210,6 +235,18 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh
return; return;
} }
// TODO: this should probably never be the case, since we already know here that it is a msg frag
if (!reg.ctx().contains<Message::Components::ContactFragments>()) {
reg.ctx().emplace<Message::Components::ContactFragments>();
}
reg.ctx().get<Message::Components::ContactFragments>().frags.emplace(fh);
// mark loaded
if (!reg.ctx().contains<Message::Components::LoadedContactFragments>()) {
reg.ctx().emplace<Message::Components::LoadedContactFragments>();
}
reg.ctx().get<Message::Components::LoadedContactFragments>().frags.emplace(fh);
for (const auto& j_entry : j) { for (const auto& j_entry : j) {
auto new_real_msg = Message3Handle{reg, reg.create()}; auto new_real_msg = Message3Handle{reg, reg.create()};
// load into staging reg // load into staging reg
@ -301,9 +338,50 @@ MessageSerializerCallbacks& MessageFragmentStore::getMSC(void) {
return _sc; return _sc;
} }
// checks range against all cursers in msgreg
static bool rangeVisible(uint64_t range_begin, uint64_t range_end, const Message3Registry& msg_reg) {
// 1D collision checks:
// - for range vs range:
// r1 rhs >= r0 lhs AND r1 lhs <= r0 rhs
// - for range vs point:
// p >= r0 lhs AND p <= r0 rhs
// NOTE: directions for us are reversed (begin has larger values as end)
auto c_b_view = msg_reg.view<Message::Components::Timestamp, Message::Components::ViewCurserBegin>();
c_b_view.use<Message::Components::ViewCurserBegin>();
for (const auto& [m, ts_begin_comp, vcb] : c_b_view.each()) {
// p and r1 rhs can be seen as the same
// but first we need to know if a curser begin is a point or a range
// TODO: margin?
auto ts_begin = ts_begin_comp.ts;
auto ts_end = ts_begin_comp.ts; // simplyfy code by making a single begin curser act as an infinitly small range
if (msg_reg.valid(vcb.curser_end) && msg_reg.all_of<Message::Components::ViewCurserEnd>(vcb.curser_end)) {
// TODO: respect curser end's begin?
// TODO: remember which ends we checked and check remaining
ts_end = msg_reg.get<Message::Components::Timestamp>(vcb.curser_end).ts;
// sanity check curser order
if (ts_end > ts_begin) {
std::cerr << "MFS warning: begin curser and end curser of view swapped!!\n";
std::swap(ts_begin, ts_end);
}
}
// perform both checks here
if (ts_begin < range_end || ts_end > range_begin) {
continue;
}
// range hits a view
return true;
}
return false;
}
float MessageFragmentStore::tick(float time_delta) { float MessageFragmentStore::tick(float time_delta) {
// sync dirty fragments here // sync dirty fragments here
if (!_fuid_save_queue.empty()) { if (!_fuid_save_queue.empty()) {
const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id); const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id);
auto* reg = _fuid_save_queue.front().reg; auto* reg = _fuid_save_queue.front().reg;
@ -373,6 +451,92 @@ float MessageFragmentStore::tick(float time_delta) {
} }
} }
// load needed fragments here
// last check event frags
// only checks if it collides with ranges, not adjacency ???
// bc ~range~ msgreg will be marked dirty and checked next tick
if (!_event_check_queue.empty()) {
auto fh = _fs.fragmentHandle(_event_check_queue.front().fid);
auto c = _event_check_queue.front().c;
_event_check_queue.pop();
if (!static_cast<bool>(fh)) {
return 0.05f;
}
if (!fh.all_of<FragComp::MessagesTSRange>()) {
return 0.05f;
}
// get ts range of frag and collide with all curser(s/ranges)
const auto& frag_range = fh.get<FragComp::MessagesTSRange>();
auto* msg_reg = _rmm.get(c);
if (msg_reg == nullptr) {
return 0.05f;
}
if (rangeVisible(frag_range.begin, frag_range.end, !msg_reg)) {
loadFragment(*msg_reg, fh);
_potentially_dirty_contacts.emplace(c);
}
return 0.05f; // only one but soon again
}
if (!_potentially_dirty_contacts.empty()) {
// here we check if any view of said contact needs frag loading
// only once per tick tho
// TODO: this makes order depend on internal order and is not fair
auto it = _potentially_dirty_contacts.cbegin();
auto* msg_reg = _rmm.get(*it);
// first do collision check agains every contact associated fragment
// that is not already loaded !!
if (msg_reg->ctx().contains<Message::Components::ContactFragments>()) {
for (const FragmentID fid : msg_reg->ctx().get<Message::Components::ContactFragments>().frags) {
// TODO: better ctx caching code?
if (msg_reg->ctx().contains<Message::Components::LoadedContactFragments>()) {
if (msg_reg->ctx().get<Message::Components::LoadedContactFragments>().frags.contains(fid)) {
continue;
}
}
auto fh = _fs.fragmentHandle(fid);
if (!static_cast<bool>(fh)) {
return 0.05f;
}
if (!fh.all_of<FragComp::MessagesTSRange>()) {
return 0.05f;
}
// get ts range of frag and collide with all curser(s/ranges)
const auto& [range_begin, range_end] = fh.get<FragComp::MessagesTSRange>();
if (rangeVisible(range_begin, range_end, *msg_reg)) {
loadFragment(*msg_reg, fh);
return 0.05f;
}
}
// no new visible fragment
// now, finally, check for adjecent fragments that need to be loaded
// we do this by finding a fragment in a rage
} else {
// contact has no fragments, skip
}
_potentially_dirty_contacts.erase(it);
return 0.05f;
}
return 1000.f*60.f*60.f; return 1000.f*60.f*60.f;
} }
@ -531,8 +695,6 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e)
// TODO: are we sure it is a *new* fragment? // TODO: are we sure it is a *new* fragment?
//std::cout << "MFS: got frag for us!\n";
Contact3 frag_contact = entt::null; Contact3 frag_contact = entt::null;
{ // get contact { // get contact
const auto& frag_contact_id = e.e.get<FragComp::MessagesContact>().id; const auto& frag_contact_id = e.e.get<FragComp::MessagesContact>().id;
@ -551,15 +713,19 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e)
} }
} }
// only load if msg reg open // create if not exist
auto* msg_reg = static_cast<const RegistryMessageModel&>(_rmm).get(frag_contact); auto* msg_reg = _rmm.get(frag_contact);
if (msg_reg == nullptr) { if (msg_reg == nullptr) {
// msg reg not created yet // msg reg not created yet
return false; return false;
} }
// TODO: should this be done async / on tick() instead of on event? if (!msg_reg->ctx().contains<Message::Components::ContactFragments>()) {
loadFragment(*msg_reg, e.e); msg_reg->ctx().emplace<Message::Components::ContactFragments>();
}
msg_reg->ctx().get<Message::Components::ContactFragments>().frags.emplace(e.e);
_event_check_queue.push(ECQueueEntry{e.e, frag_contact});
return false; return false;
} }

View File

@ -8,6 +8,7 @@
#include <entt/entity/registry.hpp> #include <entt/entity/registry.hpp>
#include <entt/container/dense_map.hpp> #include <entt/container/dense_map.hpp>
#include <entt/container/dense_set.hpp>
#include <solanaceae/contact/contact_model3.hpp> #include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/message3/registry_message_model.hpp> #include <solanaceae/message3/registry_message_model.hpp>
@ -20,16 +21,40 @@ namespace Message::Components {
using FUID = FragComp::ID; using FUID = FragComp::ID;
// unused
struct FID { struct FID {
FragmentID fid {entt::null}; FragmentID fid {entt::null};
}; };
// points to the front/newer message
// together they define a range that is,
// eg the first(end) and last(begin) message being rendered
// MFS requires there to be atleast one other message after/before,
// if not loaded fragment with fitting tsrange(direction) available
// uses fragmentAfter/Before()
// they can exist standalone
// if they are a pair, the inside is filled first
// cursers require a timestamp ???
struct ViewCurserBegin {
Message3 curser_end{entt::null};
};
struct ViewCurserEnd {
Message3 curser_begin{entt::null};
};
// mfs will only load a limited number of fragments per tick (1),
// so this tag will be set if we loaded a fragment and
// every tick we check all cursers for this tag and continue
// and remove once no fragment could be loaded anymore
// (internal)
struct TagCurserUnsatisfied {};
} // Message::Components } // Message::Components
namespace Fragment::Components { namespace Fragment::Components {
struct MessagesTSRange { struct MessagesTSRange {
// timestamp range within the fragment // timestamp range within the fragment
uint64_t begin {0}; uint64_t begin {0}; // newer msg -> higher number
uint64_t end {0}; uint64_t end {0};
}; };
@ -59,12 +84,23 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS
void loadFragment(Message3Registry& reg, FragmentHandle fh); void loadFragment(Message3Registry& reg, FragmentHandle fh);
struct QueueEntry final { struct SaveQueueEntry final {
uint64_t ts_since_dirty{0}; uint64_t ts_since_dirty{0};
std::vector<uint8_t> id; std::vector<uint8_t> id;
Message3Registry* reg{nullptr}; Message3Registry* reg{nullptr};
}; };
std::queue<QueueEntry> _fuid_save_queue; std::queue<SaveQueueEntry> _fuid_save_queue;
struct ECQueueEntry final {
FragmentID fid;
Contact3 c;
};
std::queue<ECQueueEntry> _event_check_queue;
// range changed or fragment loaded.
// we only load a limited number of fragments at once,
// so we need to keep them dirty until nothing was loaded.
entt::dense_set<Contact3> _potentially_dirty_contacts;
public: public:
MessageFragmentStore( MessageFragmentStore(