From 88ea3e177d5a8a64c39dacdb28ba39a7c70221d3 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sun, 3 Mar 2024 15:16:01 +0100 Subject: [PATCH] refactor saving and save on exit --- src/fragment_store/message_fragment_store.cpp | 140 ++++++++++-------- src/fragment_store/message_fragment_store.hpp | 2 + 2 files changed, 80 insertions(+), 62 deletions(-) diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index 7fd73b6..081090d 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -336,7 +336,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh } if (reg.valid(dup_msg)) { - // -> merge with preexisting + // -> merge with preexisting (needs to be order independent) // -> throw update reg.destroy(new_real_msg); //_rmm.throwEventUpdate(reg, new_real_msg); @@ -354,6 +354,75 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh } } +bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry& reg) { + auto& ftsrange = fh.get_or_emplace(Message::getTimeMS(), Message::getTimeMS()); + + auto j = nlohmann::json::array(); + + // TODO: does every message have ts? + auto msg_view = reg.view(); + // we also assume all messages have fid + for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) { + const Message3 m = *it; + + if (!reg.all_of(m)) { + continue; + } + + // require msg for now + if (!reg.any_of(m)) { + continue; + } + + if (_fuid_save_queue.front().id != reg.get(m).fid) { + continue; // not ours + } + + { // potentially adjust tsrange (some external processes can change timestamps) + const auto msg_ts = msg_view.get(m).ts; + if (ftsrange.begin > msg_ts) { + ftsrange.begin = msg_ts; + } else if (ftsrange.end < msg_ts) { + ftsrange.end = msg_ts; + } + } + + auto& j_entry = j.emplace_back(nlohmann::json::object()); + + for (const auto& [type_id, storage] : reg.storage()) { + if (!storage.contains(m)) { + continue; + } + + //std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n"; + + // use type_id to find serializer + auto s_cb_it = _sc._serl_json.find(type_id); + if (s_cb_it == _sc._serl_json.end()) { + // could not find serializer, not saving + //std::cout << "missing " << storage.type().name() << "(" << type_id << ")\n"; + continue; + } + + s_cb_it->second(_sc, {reg, m}, j_entry[storage.type().name()]); + } + } + + // we cant skip if array is empty (in theory it will not be empty later on) + + // if save as binary + //nlohmann::json::to_msgpack(j); + auto j_dump = j.dump(2, ' ', true); + if (_fs.syncToStorage(fh, reinterpret_cast(j_dump.data()), j_dump.size())) { + //std::cout << "MFS: dumped " << j_dump << "\n"; + // succ + return true; + } + + // TODO: error + return false; +} + MessageFragmentStore::MessageFragmentStore( Contact3Registry& cr, RegistryMessageModel& rmm, @@ -372,7 +441,13 @@ MessageFragmentStore::MessageFragmentStore( } MessageFragmentStore::~MessageFragmentStore(void) { - // TODO: sync all dirty fragments + while (!_fuid_save_queue.empty()) { + auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id); + auto* reg = _fuid_save_queue.front().reg; + assert(reg != nullptr); + syncFragToStorage(fh, *reg); + _fuid_save_queue.pop(); // pop unconditionally + } } MessageSerializerCallbacks& MessageFragmentStore::getMSC(void) { @@ -444,66 +519,7 @@ float MessageFragmentStore::tick(float time_delta) { auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id); auto* reg = _fuid_save_queue.front().reg; assert(reg != nullptr); - - auto& ftsrange = fh.get_or_emplace(Message::getTimeMS(), Message::getTimeMS()); - - auto j = nlohmann::json::array(); - - // TODO: does every message have ts? - auto msg_view = reg->view(); - // we also assume all messages have fuid (hack: call handle when not?) - for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) { - const Message3 m = *it; - - if (!reg->all_of(m)) { - continue; - } - - // require msg for now - if (!reg->any_of(m)) { - continue; - } - - if (_fuid_save_queue.front().id != reg->get(m).fid) { - continue; // not ours - } - - { // potentially adjust tsrange (some external processes can change timestamps) - const auto msg_ts = msg_view.get(m).ts; - if (ftsrange.begin > msg_ts) { - ftsrange.begin = msg_ts; - } else if (ftsrange.end < msg_ts) { - ftsrange.end = msg_ts; - } - } - - auto& j_entry = j.emplace_back(nlohmann::json::object()); - - for (const auto& [type_id, storage] : reg->storage()) { - if (!storage.contains(m)) { - continue; - } - - //std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n"; - - // use type_id to find serializer - auto s_cb_it = _sc._serl_json.find(type_id); - if (s_cb_it == _sc._serl_json.end()) { - // could not find serializer, not saving - //std::cout << "missing " << storage.type().name() << "(" << type_id << ")\n"; - continue; - } - - s_cb_it->second(_sc, {*reg, m}, j_entry[storage.type().name()]); - } - } - - // if save as binary - //nlohmann::json::to_msgpack(j); - auto j_dump = j.dump(2, ' ', true); - if (_fs.syncToStorage(fh, reinterpret_cast(j_dump.data()), j_dump.size())) { - //std::cout << "MFS: dumped " << j_dump << "\n"; - // succ + if (syncFragToStorage(fh, *reg)) { _fuid_save_queue.pop(); } } diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index e1b08dc..3ad7628 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -87,6 +87,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS void loadFragment(Message3Registry& reg, FragmentHandle fh); + bool syncFragToStorage(FragmentHandle fh, Message3Registry& reg); + struct SaveQueueEntry final { uint64_t ts_since_dirty{0}; //std::vector id;