From 22f2c8f5145da0edd682c954c87c4f043ea9402f Mon Sep 17 00:00:00 2001 From: Green Sky Date: Thu, 22 Feb 2024 23:07:59 +0100 Subject: [PATCH] load based on view cursers (untested and not used yet) --- src/fragment_store/fragment_store.cpp | 19 +- src/fragment_store/message_fragment_store.cpp | 182 +++++++++++++++++- src/fragment_store/message_fragment_store.hpp | 42 +++- 3 files changed, 225 insertions(+), 18 deletions(-) diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index 932c079..6397e73 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -684,7 +684,7 @@ size_t FragmentStore::scanStoragePath(std::string_view path) { } } - size_t count {0}; + std::vector scanned_frags; // step 3: parse meta and insert into reg of non preexising // main thread // TODO: check timestamps of preexisting and reload? mark external/remote dirty? @@ -749,8 +749,8 @@ size_t FragmentStore::scanStoragePath(std::string_view path) { meta_data_decomp.resize(ZSTD_DStreamOutSize()); ZSTD_DCtx* const dctx = ZSTD_createDCtx(); - ZSTD_inBuffer input {meta_data_ref.data(), meta_data_ref.size(), 0 }; - ZSTD_outBuffer output = { meta_data_decomp.data(), meta_data_decomp.size(), 0 }; + ZSTD_inBuffer input {meta_data_ref.data(), meta_data_ref.size(), 0}; + ZSTD_outBuffer output = {meta_data_decomp.data(), meta_data_decomp.size(), 0}; do { size_t const ret = ZSTD_decompressStream(dctx, &output , &input); if (ZSTD_isError(ret)) { @@ -809,12 +809,17 @@ size_t FragmentStore::scanStoragePath(std::string_view path) { std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n"; } } - // throw new frag event here - throwEventConstruct(fh); - count++; + scanned_frags.push_back(fh); } - return count; + // TODO: mutex and move code to async and return this list ? + + // throw new frag event here, after loading them all + for (const FragmentID fid : scanned_frags) { + throwEventConstruct(fid); + } + + return scanned_frags.size(); } void FragmentStore::scanStoragePathAsync(std::string path) { diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index c18aad9..e6a2117 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -29,6 +29,18 @@ namespace Message::Components { std::vector fuid_open; }; + // all message fragments of this contact + struct ContactFragments final { + // kept up-to-date by events + entt::dense_set frags; + }; + + // all LOADED message fragments + struct LoadedContactFragments final { + // kept up-to-date by events + entt::dense_set frags; + }; + } // Message::Components namespace Fragment::Components { @@ -75,6 +87,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // registry is the same as the one the message event is for if (static_cast(_rmm).get(frag_contact) == m.registry()) { + // TODO: make dirty instead (they already are) loadFragment(*m.registry(), FragmentHandle{_fs._reg, fid}); } } @@ -163,6 +176,18 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { new_ts_range.begin = msg_ts; new_ts_range.end = msg_ts; + // contact frag + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); + } + m.registry()->ctx().get().frags.emplace(fh); + + // loaded contact frag + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); + } + m.registry()->ctx().get().frags.emplace(fh); + { const auto msg_reg_contact = m.registry()->ctx().get(); if (_cr.all_of(msg_reg_contact)) { @@ -199,7 +224,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // on new and update: mark as fragment dirty } -// assumes new frag +// assumes not loaded frag // need update from frag void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh) { std::cout << "MFS: loadFragment\n"; @@ -210,6 +235,18 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh return; } + // TODO: this should probably never be the case, since we already know here that it is a msg frag + if (!reg.ctx().contains()) { + reg.ctx().emplace(); + } + reg.ctx().get().frags.emplace(fh); + + // mark loaded + if (!reg.ctx().contains()) { + reg.ctx().emplace(); + } + reg.ctx().get().frags.emplace(fh); + for (const auto& j_entry : j) { auto new_real_msg = Message3Handle{reg, reg.create()}; // load into staging reg @@ -301,9 +338,50 @@ MessageSerializerCallbacks& MessageFragmentStore::getMSC(void) { return _sc; } +// checks range against all cursers in msgreg +static bool rangeVisible(uint64_t range_begin, uint64_t range_end, const Message3Registry& msg_reg) { + // 1D collision checks: + // - for range vs range: + // r1 rhs >= r0 lhs AND r1 lhs <= r0 rhs + // - for range vs point: + // p >= r0 lhs AND p <= r0 rhs + // NOTE: directions for us are reversed (begin has larger values as end) + + auto c_b_view = msg_reg.view(); + c_b_view.use(); + for (const auto& [m, ts_begin_comp, vcb] : c_b_view.each()) { + // p and r1 rhs can be seen as the same + // but first we need to know if a curser begin is a point or a range + + // TODO: margin? + auto ts_begin = ts_begin_comp.ts; + auto ts_end = ts_begin_comp.ts; // simplyfy code by making a single begin curser act as an infinitly small range + if (msg_reg.valid(vcb.curser_end) && msg_reg.all_of(vcb.curser_end)) { + // TODO: respect curser end's begin? + // TODO: remember which ends we checked and check remaining + ts_end = msg_reg.get(vcb.curser_end).ts; + + // sanity check curser order + if (ts_end > ts_begin) { + std::cerr << "MFS warning: begin curser and end curser of view swapped!!\n"; + std::swap(ts_begin, ts_end); + } + } + + // perform both checks here + if (ts_begin < range_end || ts_end > range_begin) { + continue; + } + + // range hits a view + return true; + } + + return false; +} + float MessageFragmentStore::tick(float time_delta) { // sync dirty fragments here - if (!_fuid_save_queue.empty()) { const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id); auto* reg = _fuid_save_queue.front().reg; @@ -373,6 +451,92 @@ float MessageFragmentStore::tick(float time_delta) { } } + // load needed fragments here + + // last check event frags + // only checks if it collides with ranges, not adjacency ??? + // bc ~range~ msgreg will be marked dirty and checked next tick + if (!_event_check_queue.empty()) { + auto fh = _fs.fragmentHandle(_event_check_queue.front().fid); + auto c = _event_check_queue.front().c; + _event_check_queue.pop(); + + if (!static_cast(fh)) { + return 0.05f; + } + + if (!fh.all_of()) { + return 0.05f; + } + + // get ts range of frag and collide with all curser(s/ranges) + const auto& frag_range = fh.get(); + + auto* msg_reg = _rmm.get(c); + if (msg_reg == nullptr) { + return 0.05f; + } + + if (rangeVisible(frag_range.begin, frag_range.end, !msg_reg)) { + loadFragment(*msg_reg, fh); + _potentially_dirty_contacts.emplace(c); + } + + return 0.05f; // only one but soon again + } + + if (!_potentially_dirty_contacts.empty()) { + // here we check if any view of said contact needs frag loading + // only once per tick tho + + // TODO: this makes order depend on internal order and is not fair + auto it = _potentially_dirty_contacts.cbegin(); + + auto* msg_reg = _rmm.get(*it); + + // first do collision check agains every contact associated fragment + // that is not already loaded !! + if (msg_reg->ctx().contains()) { + for (const FragmentID fid : msg_reg->ctx().get().frags) { + // TODO: better ctx caching code? + if (msg_reg->ctx().contains()) { + if (msg_reg->ctx().get().frags.contains(fid)) { + continue; + } + } + + auto fh = _fs.fragmentHandle(fid); + + if (!static_cast(fh)) { + return 0.05f; + } + + if (!fh.all_of()) { + return 0.05f; + } + + // get ts range of frag and collide with all curser(s/ranges) + const auto& [range_begin, range_end] = fh.get(); + + if (rangeVisible(range_begin, range_end, *msg_reg)) { + loadFragment(*msg_reg, fh); + return 0.05f; + } + } + // no new visible fragment + + // now, finally, check for adjecent fragments that need to be loaded + // we do this by finding a fragment in a rage + } else { + // contact has no fragments, skip + } + + _potentially_dirty_contacts.erase(it); + + return 0.05f; + } + + return 1000.f*60.f*60.f; } @@ -531,8 +695,6 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) // TODO: are we sure it is a *new* fragment? - //std::cout << "MFS: got frag for us!\n"; - Contact3 frag_contact = entt::null; { // get contact const auto& frag_contact_id = e.e.get().id; @@ -551,15 +713,19 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) } } - // only load if msg reg open - auto* msg_reg = static_cast(_rmm).get(frag_contact); + // create if not exist + auto* msg_reg = _rmm.get(frag_contact); if (msg_reg == nullptr) { // msg reg not created yet return false; } - // TODO: should this be done async / on tick() instead of on event? - loadFragment(*msg_reg, e.e); + if (!msg_reg->ctx().contains()) { + msg_reg->ctx().emplace(); + } + msg_reg->ctx().get().frags.emplace(e.e); + + _event_check_queue.push(ECQueueEntry{e.e, frag_contact}); return false; } diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index a167005..9e64e75 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -20,16 +21,40 @@ namespace Message::Components { using FUID = FragComp::ID; + // unused struct FID { FragmentID fid {entt::null}; }; + // points to the front/newer message + // together they define a range that is, + // eg the first(end) and last(begin) message being rendered + // MFS requires there to be atleast one other message after/before, + // if not loaded fragment with fitting tsrange(direction) available + // uses fragmentAfter/Before() + // they can exist standalone + // if they are a pair, the inside is filled first + // cursers require a timestamp ??? + struct ViewCurserBegin { + Message3 curser_end{entt::null}; + }; + struct ViewCurserEnd { + Message3 curser_begin{entt::null}; + }; + + // mfs will only load a limited number of fragments per tick (1), + // so this tag will be set if we loaded a fragment and + // every tick we check all cursers for this tag and continue + // and remove once no fragment could be loaded anymore + // (internal) + struct TagCurserUnsatisfied {}; + } // Message::Components namespace Fragment::Components { struct MessagesTSRange { // timestamp range within the fragment - uint64_t begin {0}; + uint64_t begin {0}; // newer msg -> higher number uint64_t end {0}; }; @@ -59,12 +84,23 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS void loadFragment(Message3Registry& reg, FragmentHandle fh); - struct QueueEntry final { + struct SaveQueueEntry final { uint64_t ts_since_dirty{0}; std::vector id; Message3Registry* reg{nullptr}; }; - std::queue _fuid_save_queue; + std::queue _fuid_save_queue; + + struct ECQueueEntry final { + FragmentID fid; + Contact3 c; + }; + std::queue _event_check_queue; + + // range changed or fragment loaded. + // we only load a limited number of fragments at once, + // so we need to keep them dirty until nothing was loaded. + entt::dense_set _potentially_dirty_contacts; public: MessageFragmentStore(