From 7b8e93eec3eee2d001cb3f23ecc89c0d14c84f38 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sun, 3 Mar 2024 12:20:41 +0100 Subject: [PATCH] refactor message fuid -> fid save alot of memory by using fid instead of fuid --- src/fragment_store/message_fragment_store.cpp | 64 ++++++++++--------- src/fragment_store/message_fragment_store.hpp | 7 +- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index ade8203b..1af217db 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -24,12 +24,12 @@ namespace Message::Components { // ctx struct OpenFragments { - struct OpenFrag final { - std::vector uid; - }; - // only contains fragments with <1024 messages and <28h tsrage - // TODO: this needs to move into the message reg - std::vector fuid_open; + //struct OpenFrag final { + ////std::vector uid; + //FragmentID id; + //}; + // only contains fragments with <1024 messages and <28h tsrage (or whatever) + entt::dense_set fid_open; }; // all message fragments of this contact @@ -96,39 +96,39 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } // TODO: use fid, seving full fuid for every message consumes alot of memory (and heap frag) - if (!m.all_of()) { - std::cout << "MFS: new msg missing FUID\n"; + if (!m.all_of()) { + std::cout << "MFS: new msg missing FID\n"; if (!m.registry()->ctx().contains()) { m.registry()->ctx().emplace(); } - auto& fuid_open = m.registry()->ctx().get().fuid_open; + auto& fid_open = m.registry()->ctx().get().fid_open; const auto msg_ts = m.get().ts; // missing fuid // find closesed non-sealed off fragment - std::vector fragment_uid; + FragmentID fragment_id{entt::null}; // first search for fragment where the ts falls into the range - for (const auto& fuid : fuid_open) { - auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fuid.uid)); + for (const auto& fid : fid_open) { + auto fh = _fs.fragmentHandle(fid); assert(static_cast(fh)); // assuming ts range exists auto& fts_comp = fh.get(); if (fts_comp.begin <= msg_ts && fts_comp.end >= msg_ts) { - fragment_uid = fuid.uid; + fragment_id = fid; // TODO: check conditions for open here // TODO: mark msg (and frag?) dirty } } // if it did not fit into an existing fragment, we next look for fragments that could be extended - if (fragment_uid.empty()) { - for (const auto& fuid : fuid_open) { - auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fuid.uid)); + if (!_fs._reg.valid(fragment_id)) { + for (const auto& fid : fid_open) { + auto fh = _fs.fragmentHandle(fid); assert(static_cast(fh)); // assuming ts range exists @@ -141,7 +141,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // which direction if ((fts_comp.begin - possible_extention) <= msg_ts && fts_comp.begin > msg_ts) { - fragment_uid = fuid.uid; + fragment_id = fid; std::cout << "MFS: extended begin from " << fts_comp.begin << " to " << msg_ts << "\n"; @@ -158,7 +158,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // TODO: check conditions for open here // TODO: mark msg (and frag?) dirty } else if ((fts_comp.end + possible_extention) >= msg_ts && fts_comp.end < msg_ts) { - fragment_uid = fuid.uid; + fragment_id = fid; std::cout << "MFS: extended end from " << fts_comp.end << " to " << msg_ts << "\n"; @@ -178,7 +178,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } // if its still not found, we need a new fragment - if (fragment_uid.empty()) { + if (!_fs._reg.valid(fragment_id)) { const auto new_fid = _fs.newFragmentFile("test_message_store/", MetaFileType::BINARY_MSGPACK); auto fh = _fs.fragmentHandle(new_fid); if (!static_cast(fh)) { @@ -186,7 +186,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { return; } - fragment_uid = fh.get().v; + fragment_id = fh; fh.emplace_or_replace().comp = Compression::ZSTD; fh.emplace_or_replace().comp = Compression::ZSTD; @@ -216,9 +216,9 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } m.registry()->ctx().get().frags.emplace(fh); - fuid_open.emplace_back(Message::Components::OpenFragments::OpenFrag{fragment_uid}); + fid_open.emplace(fragment_id); - std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n"; + std::cout << "MFS: created new fragment " << bin2hex(fh.get().v) << "\n"; _fs_ignore_event = true; _fs.throwEventConstruct(fh); @@ -226,17 +226,20 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } // if this is still empty, something is very wrong and we exit here - if (fragment_uid.empty()) { + if (!_fs._reg.valid(fragment_id)) { std::cout << "MFS error: failed to find/create fragment for message\n"; return; } - m.emplace(fragment_uid); + m.emplace(fragment_id); // in this case we know the fragment needs an update - _fuid_save_queue.push({Message::getTimeMS(), fragment_uid, m.registry()}); + _fuid_save_queue.push({Message::getTimeMS(), fragment_id, m.registry()}); + return; // done } + //m.get(); + // TODO: save updates, and not only new messages (read state etc) // new fragment?, since we dont write to others fragments? @@ -288,7 +291,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh } } - new_real_msg.emplace_or_replace(fh.get()); + new_real_msg.emplace_or_replace(fh); // dup check (hacky, specific to protocols) Message3 dup_msg {entt::null}; @@ -421,11 +424,10 @@ static bool isLess(const std::vector& lhs, const std::vector& float MessageFragmentStore::tick(float time_delta) { // sync dirty fragments here if (!_fuid_save_queue.empty()) { - const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id); + auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id); auto* reg = _fuid_save_queue.front().reg; assert(reg != nullptr); - auto fh = _fs.fragmentHandle(fid); auto& ftsrange = fh.get_or_emplace(Message::getTimeMS(), Message::getTimeMS()); auto j = nlohmann::json::array(); @@ -436,7 +438,7 @@ float MessageFragmentStore::tick(float time_delta) { for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) { const Message3 m = *it; - if (!reg->all_of(m)) { + if (!reg->all_of(m)) { continue; } @@ -445,7 +447,7 @@ float MessageFragmentStore::tick(float time_delta) { continue; } - if (_fuid_save_queue.front().id != reg->get(m).v) { + if (_fuid_save_queue.front().id != reg->get(m).fid) { continue; // not ours } @@ -482,7 +484,7 @@ float MessageFragmentStore::tick(float time_delta) { // if save as binary //nlohmann::json::to_msgpack(j); auto j_dump = j.dump(2, ' ', true); - if (_fs.syncToStorage(fid, reinterpret_cast(j_dump.data()), j_dump.size())) { + if (_fs.syncToStorage(fh, reinterpret_cast(j_dump.data()), j_dump.size())) { //std::cout << "MFS: dumped " << j_dump << "\n"; // succ _fuid_save_queue.pop(); diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index ea854e91..e1b08dc9 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -19,9 +19,9 @@ namespace Message::Components { - using FUID = FragComp::ID; + // unused, consumes too much memory (highly compressable) + //using FUID = FragComp::ID; - // unused struct FID { FragmentID fid {entt::null}; }; @@ -89,7 +89,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS struct SaveQueueEntry final { uint64_t ts_since_dirty{0}; - std::vector id; + //std::vector id; + FragmentID id; Message3Registry* reg{nullptr}; }; std::queue _fuid_save_queue;