From 3d41eedf48e54e8a980774e282d0202c204656b7 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 13 Feb 2024 20:10:33 +0100 Subject: [PATCH] message fragment meta is saved, but still empty data --- src/fragment_store/message_fragment_store.cpp | 59 +++++++++++++++++-- src/fragment_store/message_fragment_store.hpp | 17 ++++-- src/main_screen.cpp | 1 + 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index 2f1324c1..9009cfbc 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -1,11 +1,14 @@ #include "./message_fragment_store.hpp" +#include + #include #include #include #include +#include static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) { if (comp == nullptr) { @@ -40,8 +43,8 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { std::vector fragment_uid; // first search for fragment where the ts falls into the range - for (const auto& [tsrage, fid] : _fuid_open) { - if (tsrage.ts_begin <= msg_ts && tsrage.ts_end >= msg_ts) { + for (const auto& [ts_begin, ts_end, fid] : _fuid_open) { + if (ts_begin <= msg_ts && ts_end >= msg_ts) { fragment_uid = fid; // TODO: check conditions for open here // TODO: mark msg (and frag?) dirty @@ -50,32 +53,39 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // if it did not fit into an existing fragment, we next look for fragments that could be extended if (fragment_uid.empty()) { - for (const auto& [tsrage, fid] : _fuid_open) { - const int64_t frag_range = int64_t(tsrage.ts_end) - int64_t(tsrage.ts_begin); + for (auto& [ts_begin, ts_end, fid] : _fuid_open) { + const int64_t frag_range = int64_t(ts_end) - int64_t(ts_begin); //constexpr static int64_t max_frag_ts_extent {1000*60*60}; constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing const int64_t possible_extention = max_frag_ts_extent - frag_range; // which direction - if ((tsrage.ts_begin - possible_extention) <= msg_ts) { + if ((ts_begin - possible_extention) <= msg_ts && ts_begin > msg_ts) { fragment_uid = fid; auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid)); assert(static_cast(fh)); + std::cout << "MFS: extended begin from " << ts_begin << " to " << msg_ts << "\n"; + // assuming ts range exists auto& fts_comp = fh.get(); fts_comp.begin = msg_ts; // extend into the past + ts_begin = msg_ts; + // TODO: check conditions for open here // TODO: mark msg (and frag?) dirty - } else if ((tsrage.ts_end + possible_extention) >= msg_ts) { + } else if ((ts_end + possible_extention) >= msg_ts && ts_end < msg_ts) { fragment_uid = fid; auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid)); assert(static_cast(fh)); + std::cout << "MFS: extended end from " << ts_end << " to " << msg_ts << "\n"; + // assuming ts range exists auto& fts_comp = fh.get(); fts_comp.end = msg_ts; // extend into the future + ts_end = msg_ts; // TODO: check conditions for open here // TODO: mark msg (and frag?) dirty @@ -85,14 +95,34 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // if its still not found, we need a new fragment if (fragment_uid.empty()) { + const auto new_fid = _fs.newFragmentFile("test_message_store/", MetaFileType::TEXT_JSON); + auto fh = _fs.fragmentHandle(new_fid); + if (!static_cast(fh)) { + std::cout << "MFS error: failed to create new fragment for message\n"; + return; + } + + fragment_uid = fh.get().v; + + auto& new_ts_range = fh.emplace(); + new_ts_range.begin = msg_ts; + new_ts_range.end = msg_ts; + + _fuid_open.emplace_back(OpenFrag{msg_ts, msg_ts, fragment_uid}); + + std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n"; } // if this is still empty, something is very wrong and we exit here if (fragment_uid.empty()) { + std::cout << "MFS error: failed to find/create fragment for message\n"; return; } m.emplace(fragment_uid); + + // in this case we know the fragment needs an update + _fuid_save_queue.push({Message::getTimeMS(), fragment_uid}); } // TODO: do we use fid? @@ -116,6 +146,21 @@ MessageFragmentStore::~MessageFragmentStore(void) { } float MessageFragmentStore::tick(float time_delta) { + // sync dirty fragments here + + if (!_fuid_save_queue.empty()) { + const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id); + auto j = nlohmann::json::object(); + // if save as binary + //nlohmann::json::to_msgpack(j); + auto j_dump = j.dump(2, ' ', true); + if (_fs.syncToStorage(fid, reinterpret_cast(j_dump.data()), j_dump.size())) { + std::cout << "MFS: dumped " << j_dump << "\n"; + // succ + _fuid_save_queue.pop(); + } + } + return 1000.f*60.f*60.f; } @@ -129,3 +174,5 @@ bool MessageFragmentStore::onEvent(const Message::Events::MessageUpdated& e) { return false; } +// TODO: handle deletes? diff between unload? + diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index aca4d508..ea76c25a 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -4,13 +4,14 @@ #include "./fragment_store_i.hpp" #include "./fragment_store.hpp" -#include #include #include #include -#include +#include +#include +#include namespace Message::Components { @@ -41,14 +42,20 @@ class MessageFragmentStore : public RegistryMessageModelEventI { void handleMessage(const Message3Handle& m); - struct TSRange final { + struct OpenFrag final { uint64_t ts_begin {0}; uint64_t ts_end {0}; + std::vector uid; }; // only contains fragments with <1024 messages and <28h tsrage - std::map> _fuid_open; + // TODO: this needs to move into the message reg + std::vector _fuid_open; - std::map> _fuid_save_queue; + struct QueueEntry final { + uint64_t ts_since_dirty{0}; + std::vector id; + }; + std::queue _fuid_save_queue; public: MessageFragmentStore( diff --git a/src/main_screen.cpp b/src/main_screen.cpp index 9e00875d..07c4d062 100644 --- a/src/main_screen.cpp +++ b/src/main_screen.cpp @@ -422,6 +422,7 @@ Screen* MainScreen::tick(float time_delta, bool& quit) { tdch.tick(time_delta); // compute mts.iterate(); // compute + mfs.tick(time_delta); // TODO: use delta _min_tick_interval = std::min( // HACK: pow by 1.6 to increase 50 -> ~500 (~522)