refactor message fuid -> fid

save alot of memory by using fid instead of fuid
This commit is contained in:
Green Sky 2024-03-03 12:20:41 +01:00
parent 71be5c3c6e
commit 7b8e93eec3
No known key found for this signature in database
2 changed files with 37 additions and 34 deletions

View File

@ -24,12 +24,12 @@ namespace Message::Components {
// ctx
struct OpenFragments {
struct OpenFrag final {
std::vector<uint8_t> uid;
};
// only contains fragments with <1024 messages and <28h tsrage
// TODO: this needs to move into the message reg
std::vector<OpenFrag> fuid_open;
//struct OpenFrag final {
////std::vector<uint8_t> uid;
//FragmentID id;
//};
// only contains fragments with <1024 messages and <28h tsrage (or whatever)
entt::dense_set<FragmentID> fid_open;
};
// all message fragments of this contact
@ -96,39 +96,39 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
}
// TODO: use fid, seving full fuid for every message consumes alot of memory (and heap frag)
if (!m.all_of<Message::Components::FUID>()) {
std::cout << "MFS: new msg missing FUID\n";
if (!m.all_of<Message::Components::FID>()) {
std::cout << "MFS: new msg missing FID\n";
if (!m.registry()->ctx().contains<Message::Components::OpenFragments>()) {
m.registry()->ctx().emplace<Message::Components::OpenFragments>();
}
auto& fuid_open = m.registry()->ctx().get<Message::Components::OpenFragments>().fuid_open;
auto& fid_open = m.registry()->ctx().get<Message::Components::OpenFragments>().fid_open;
const auto msg_ts = m.get<Message::Components::Timestamp>().ts;
// missing fuid
// find closesed non-sealed off fragment
std::vector<uint8_t> fragment_uid;
FragmentID fragment_id{entt::null};
// first search for fragment where the ts falls into the range
for (const auto& fuid : fuid_open) {
auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fuid.uid));
for (const auto& fid : fid_open) {
auto fh = _fs.fragmentHandle(fid);
assert(static_cast<bool>(fh));
// assuming ts range exists
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
if (fts_comp.begin <= msg_ts && fts_comp.end >= msg_ts) {
fragment_uid = fuid.uid;
fragment_id = fid;
// TODO: check conditions for open here
// TODO: mark msg (and frag?) dirty
}
}
// if it did not fit into an existing fragment, we next look for fragments that could be extended
if (fragment_uid.empty()) {
for (const auto& fuid : fuid_open) {
auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fuid.uid));
if (!_fs._reg.valid(fragment_id)) {
for (const auto& fid : fid_open) {
auto fh = _fs.fragmentHandle(fid);
assert(static_cast<bool>(fh));
// assuming ts range exists
@ -141,7 +141,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// which direction
if ((fts_comp.begin - possible_extention) <= msg_ts && fts_comp.begin > msg_ts) {
fragment_uid = fuid.uid;
fragment_id = fid;
std::cout << "MFS: extended begin from " << fts_comp.begin << " to " << msg_ts << "\n";
@ -158,7 +158,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// TODO: check conditions for open here
// TODO: mark msg (and frag?) dirty
} else if ((fts_comp.end + possible_extention) >= msg_ts && fts_comp.end < msg_ts) {
fragment_uid = fuid.uid;
fragment_id = fid;
std::cout << "MFS: extended end from " << fts_comp.end << " to " << msg_ts << "\n";
@ -178,7 +178,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
}
// if its still not found, we need a new fragment
if (fragment_uid.empty()) {
if (!_fs._reg.valid(fragment_id)) {
const auto new_fid = _fs.newFragmentFile("test_message_store/", MetaFileType::BINARY_MSGPACK);
auto fh = _fs.fragmentHandle(new_fid);
if (!static_cast<bool>(fh)) {
@ -186,7 +186,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
return;
}
fragment_uid = fh.get<FragComp::ID>().v;
fragment_id = fh;
fh.emplace_or_replace<FragComp::Ephemeral::MetaCompressionType>().comp = Compression::ZSTD;
fh.emplace_or_replace<FragComp::DataCompressionType>().comp = Compression::ZSTD;
@ -216,9 +216,9 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
}
m.registry()->ctx().get<Message::Components::LoadedContactFragments>().frags.emplace(fh);
fuid_open.emplace_back(Message::Components::OpenFragments::OpenFrag{fragment_uid});
fid_open.emplace(fragment_id);
std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n";
std::cout << "MFS: created new fragment " << bin2hex(fh.get<FragComp::ID>().v) << "\n";
_fs_ignore_event = true;
_fs.throwEventConstruct(fh);
@ -226,17 +226,20 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
}
// if this is still empty, something is very wrong and we exit here
if (fragment_uid.empty()) {
if (!_fs._reg.valid(fragment_id)) {
std::cout << "MFS error: failed to find/create fragment for message\n";
return;
}
m.emplace<Message::Components::FUID>(fragment_uid);
m.emplace<Message::Components::FID>(fragment_id);
// in this case we know the fragment needs an update
_fuid_save_queue.push({Message::getTimeMS(), fragment_uid, m.registry()});
_fuid_save_queue.push({Message::getTimeMS(), fragment_id, m.registry()});
return; // done
}
//m.get<Message::Components::FID>();
// TODO: save updates, and not only new messages (read state etc)
// new fragment?, since we dont write to others fragments?
@ -288,7 +291,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh
}
}
new_real_msg.emplace_or_replace<Message::Components::FUID>(fh.get<FragComp::ID>());
new_real_msg.emplace_or_replace<Message::Components::FID>(fh);
// dup check (hacky, specific to protocols)
Message3 dup_msg {entt::null};
@ -421,11 +424,10 @@ static bool isLess(const std::vector<uint8_t>& lhs, const std::vector<uint8_t>&
float MessageFragmentStore::tick(float time_delta) {
// sync dirty fragments here
if (!_fuid_save_queue.empty()) {
const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id);
auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id);
auto* reg = _fuid_save_queue.front().reg;
assert(reg != nullptr);
auto fh = _fs.fragmentHandle(fid);
auto& ftsrange = fh.get_or_emplace<FragComp::MessagesTSRange>(Message::getTimeMS(), Message::getTimeMS());
auto j = nlohmann::json::array();
@ -436,7 +438,7 @@ float MessageFragmentStore::tick(float time_delta) {
for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) {
const Message3 m = *it;
if (!reg->all_of<Message::Components::FUID, Message::Components::ContactFrom, Message::Components::ContactTo>(m)) {
if (!reg->all_of<Message::Components::FID, Message::Components::ContactFrom, Message::Components::ContactTo>(m)) {
continue;
}
@ -445,7 +447,7 @@ float MessageFragmentStore::tick(float time_delta) {
continue;
}
if (_fuid_save_queue.front().id != reg->get<Message::Components::FUID>(m).v) {
if (_fuid_save_queue.front().id != reg->get<Message::Components::FID>(m).fid) {
continue; // not ours
}
@ -482,7 +484,7 @@ float MessageFragmentStore::tick(float time_delta) {
// if save as binary
//nlohmann::json::to_msgpack(j);
auto j_dump = j.dump(2, ' ', true);
if (_fs.syncToStorage(fid, reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size())) {
if (_fs.syncToStorage(fh, reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size())) {
//std::cout << "MFS: dumped " << j_dump << "\n";
// succ
_fuid_save_queue.pop();

View File

@ -19,9 +19,9 @@
namespace Message::Components {
using FUID = FragComp::ID;
// unused, consumes too much memory (highly compressable)
//using FUID = FragComp::ID;
// unused
struct FID {
FragmentID fid {entt::null};
};
@ -89,7 +89,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS
struct SaveQueueEntry final {
uint64_t ts_since_dirty{0};
std::vector<uint8_t> id;
//std::vector<uint8_t> id;
FragmentID id;
Message3Registry* reg{nullptr};
};
std::queue<SaveQueueEntry> _fuid_save_queue;