From 58e9fd551436df4f3bc77f65aae2b02071e928be Mon Sep 17 00:00:00 2001 From: Green Sky Date: Wed, 14 Feb 2024 00:58:21 +0100 Subject: [PATCH] dump messages to data (some comps) --- CMakeLists.txt | 4 +- src/fragment_store/message_fragment_store.cpp | 92 ++++++++++++++++++- src/fragment_store/message_fragment_store.hpp | 13 +-- 3 files changed, 93 insertions(+), 16 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d9000e4..856947f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,8 +23,8 @@ option(TOMATO_ASAN "Build tomato with asan (gcc/clang/msvc)" OFF) if (TOMATO_ASAN) if (${CMAKE_CXX_COMPILER_ID} STREQUAL "GNU" OR ${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang") if (NOT WIN32) # exclude mingw - link_libraries(-fsanitize=address) - #link_libraries(-fsanitize=address,undefined) + #link_libraries(-fsanitize=address) + link_libraries(-fsanitize=address,undefined) #link_libraries(-fsanitize=undefined) message("II enabled ASAN") else() diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index 9009cfb..f67a300 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -10,6 +10,34 @@ #include #include +namespace Message::Components { + +// ctx +struct OpenFragments { + struct OpenFrag final { + uint64_t ts_begin {0}; + uint64_t ts_end {0}; + std::vector uid; + }; + // only contains fragments with <1024 messages and <28h tsrage + // TODO: this needs to move into the message reg + std::vector fuid_open; +}; + +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Timestamp, ts) +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(ContactFrom, c) +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(ContactTo, c) +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessageText, text) +//NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(TagMessageIsAction, void) + +} // Message::Components + +template +static bool serl_json_default(void* comp, nlohmann::json& out) { + out = *reinterpret_cast(comp); + return true; +} + static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) { if (comp == nullptr) { return false; @@ -34,6 +62,13 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { return; // we only handle msg with ts } + if (!m.registry()->ctx().contains()) { + // first message in this reg + m.registry()->ctx().emplace(); + } + + auto& fuid_open = m.registry()->ctx().get().fuid_open; + const auto msg_ts = m.get().ts; if (!m.all_of()) { @@ -43,7 +78,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { std::vector fragment_uid; // first search for fragment where the ts falls into the range - for (const auto& [ts_begin, ts_end, fid] : _fuid_open) { + for (const auto& [ts_begin, ts_end, fid] : fuid_open) { if (ts_begin <= msg_ts && ts_end >= msg_ts) { fragment_uid = fid; // TODO: check conditions for open here @@ -53,7 +88,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // if it did not fit into an existing fragment, we next look for fragments that could be extended if (fragment_uid.empty()) { - for (auto& [ts_begin, ts_end, fid] : _fuid_open) { + for (auto& [ts_begin, ts_end, fid] : fuid_open) { const int64_t frag_range = int64_t(ts_end) - int64_t(ts_begin); //constexpr static int64_t max_frag_ts_extent {1000*60*60}; constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing @@ -108,7 +143,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { new_ts_range.begin = msg_ts; new_ts_range.end = msg_ts; - _fuid_open.emplace_back(OpenFrag{msg_ts, msg_ts, fragment_uid}); + fuid_open.emplace_back(Message::Components::OpenFragments::OpenFrag{msg_ts, msg_ts, fragment_uid}); std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n"; } @@ -122,7 +157,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { m.emplace(fragment_uid); // in this case we know the fragment needs an update - _fuid_save_queue.push({Message::getTimeMS(), fragment_uid}); + _fuid_save_queue.push({Message::getTimeMS(), fragment_uid, m.registry()}); } // TODO: do we use fid? @@ -140,6 +175,18 @@ MessageFragmentStore::MessageFragmentStore( _rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); _fs._sc.registerSerializerJson(serl_json_msg_ts_range); + + _sc.registerSerializerJson(serl_json_default); + _sc.registerSerializerJson(serl_json_default); + _sc.registerSerializerJson(serl_json_default); + _sc.registerSerializerJson(serl_json_default); + //_sc.registerSerializerJson(serl_json_default<); + + // files + //_sc.registerSerializerJson() + //_sc.registerSerializerJson(); + //_sc.registerSerializerJson(); + //_sc.registerSerializerJson(); } MessageFragmentStore::~MessageFragmentStore(void) { @@ -150,7 +197,42 @@ float MessageFragmentStore::tick(float time_delta) { if (!_fuid_save_queue.empty()) { const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id); - auto j = nlohmann::json::object(); + auto* reg = _fuid_save_queue.front().reg; + assert(reg != nullptr); + auto j = nlohmann::json::array(); + + // TODO: does every message have ts? + auto msg_view = reg->view(); + // we also assume all messages have fuid (hack: call handle when not?) + for (const Message3 m : msg_view) { + if (!reg->all_of(m)) { + continue; + } + + if (!reg->any_of(m)) { + continue; + } + + auto& j_entry = j.emplace_back(nlohmann::json::object()); + + for (const auto& [type_id, storage] : reg->storage()) { + if (!storage.contains(m)) { + continue; + } + + std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n"; + + // use type_id to find serializer + auto s_cb_it = _sc._serl_json.find(type_id); + if (s_cb_it == _sc._serl_json.end()) { + // could not find serializer, not saving + continue; + } + + s_cb_it->second(storage.value(m), j_entry[storage.type().name()]); + } + } + // if save as binary //nlohmann::json::to_msgpack(j); auto j_dump = j.dump(2, ' ', true); diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index ea76c25..8c3f35d 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -40,20 +40,15 @@ class MessageFragmentStore : public RegistryMessageModelEventI { RegistryMessageModel& _rmm; FragmentStore& _fs; - void handleMessage(const Message3Handle& m); + // for message components only + SerializerCallbacks _sc; - struct OpenFrag final { - uint64_t ts_begin {0}; - uint64_t ts_end {0}; - std::vector uid; - }; - // only contains fragments with <1024 messages and <28h tsrage - // TODO: this needs to move into the message reg - std::vector _fuid_open; + void handleMessage(const Message3Handle& m); struct QueueEntry final { uint64_t ts_since_dirty{0}; std::vector id; + Message3Registry* reg{nullptr}; }; std::queue _fuid_save_queue;