dump messages to data (some comps)
This commit is contained in:
parent
3d41eedf48
commit
58e9fd5514
@ -23,8 +23,8 @@ option(TOMATO_ASAN "Build tomato with asan (gcc/clang/msvc)" OFF)
|
|||||||
if (TOMATO_ASAN)
|
if (TOMATO_ASAN)
|
||||||
if (${CMAKE_CXX_COMPILER_ID} STREQUAL "GNU" OR ${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang")
|
if (${CMAKE_CXX_COMPILER_ID} STREQUAL "GNU" OR ${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang")
|
||||||
if (NOT WIN32) # exclude mingw
|
if (NOT WIN32) # exclude mingw
|
||||||
link_libraries(-fsanitize=address)
|
#link_libraries(-fsanitize=address)
|
||||||
#link_libraries(-fsanitize=address,undefined)
|
link_libraries(-fsanitize=address,undefined)
|
||||||
#link_libraries(-fsanitize=undefined)
|
#link_libraries(-fsanitize=undefined)
|
||||||
message("II enabled ASAN")
|
message("II enabled ASAN")
|
||||||
else()
|
else()
|
||||||
|
@ -10,6 +10,34 @@
|
|||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
|
namespace Message::Components {
|
||||||
|
|
||||||
|
// ctx
|
||||||
|
struct OpenFragments {
|
||||||
|
struct OpenFrag final {
|
||||||
|
uint64_t ts_begin {0};
|
||||||
|
uint64_t ts_end {0};
|
||||||
|
std::vector<uint8_t> uid;
|
||||||
|
};
|
||||||
|
// only contains fragments with <1024 messages and <28h tsrage
|
||||||
|
// TODO: this needs to move into the message reg
|
||||||
|
std::vector<OpenFrag> fuid_open;
|
||||||
|
};
|
||||||
|
|
||||||
|
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Timestamp, ts)
|
||||||
|
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(ContactFrom, c)
|
||||||
|
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(ContactTo, c)
|
||||||
|
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessageText, text)
|
||||||
|
//NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(TagMessageIsAction, void)
|
||||||
|
|
||||||
|
} // Message::Components
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
static bool serl_json_default(void* comp, nlohmann::json& out) {
|
||||||
|
out = *reinterpret_cast<T*>(comp);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) {
|
static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) {
|
||||||
if (comp == nullptr) {
|
if (comp == nullptr) {
|
||||||
return false;
|
return false;
|
||||||
@ -34,6 +62,13 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
|||||||
return; // we only handle msg with ts
|
return; // we only handle msg with ts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!m.registry()->ctx().contains<Message::Components::OpenFragments>()) {
|
||||||
|
// first message in this reg
|
||||||
|
m.registry()->ctx().emplace<Message::Components::OpenFragments>();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& fuid_open = m.registry()->ctx().get<Message::Components::OpenFragments>().fuid_open;
|
||||||
|
|
||||||
const auto msg_ts = m.get<Message::Components::Timestamp>().ts;
|
const auto msg_ts = m.get<Message::Components::Timestamp>().ts;
|
||||||
|
|
||||||
if (!m.all_of<Message::Components::FUID>()) {
|
if (!m.all_of<Message::Components::FUID>()) {
|
||||||
@ -43,7 +78,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
|||||||
std::vector<uint8_t> fragment_uid;
|
std::vector<uint8_t> fragment_uid;
|
||||||
|
|
||||||
// first search for fragment where the ts falls into the range
|
// first search for fragment where the ts falls into the range
|
||||||
for (const auto& [ts_begin, ts_end, fid] : _fuid_open) {
|
for (const auto& [ts_begin, ts_end, fid] : fuid_open) {
|
||||||
if (ts_begin <= msg_ts && ts_end >= msg_ts) {
|
if (ts_begin <= msg_ts && ts_end >= msg_ts) {
|
||||||
fragment_uid = fid;
|
fragment_uid = fid;
|
||||||
// TODO: check conditions for open here
|
// TODO: check conditions for open here
|
||||||
@ -53,7 +88,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
|||||||
|
|
||||||
// if it did not fit into an existing fragment, we next look for fragments that could be extended
|
// if it did not fit into an existing fragment, we next look for fragments that could be extended
|
||||||
if (fragment_uid.empty()) {
|
if (fragment_uid.empty()) {
|
||||||
for (auto& [ts_begin, ts_end, fid] : _fuid_open) {
|
for (auto& [ts_begin, ts_end, fid] : fuid_open) {
|
||||||
const int64_t frag_range = int64_t(ts_end) - int64_t(ts_begin);
|
const int64_t frag_range = int64_t(ts_end) - int64_t(ts_begin);
|
||||||
//constexpr static int64_t max_frag_ts_extent {1000*60*60};
|
//constexpr static int64_t max_frag_ts_extent {1000*60*60};
|
||||||
constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing
|
constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing
|
||||||
@ -108,7 +143,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
|||||||
new_ts_range.begin = msg_ts;
|
new_ts_range.begin = msg_ts;
|
||||||
new_ts_range.end = msg_ts;
|
new_ts_range.end = msg_ts;
|
||||||
|
|
||||||
_fuid_open.emplace_back(OpenFrag{msg_ts, msg_ts, fragment_uid});
|
fuid_open.emplace_back(Message::Components::OpenFragments::OpenFrag{msg_ts, msg_ts, fragment_uid});
|
||||||
|
|
||||||
std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n";
|
std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n";
|
||||||
}
|
}
|
||||||
@ -122,7 +157,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
|||||||
m.emplace<Message::Components::FUID>(fragment_uid);
|
m.emplace<Message::Components::FUID>(fragment_uid);
|
||||||
|
|
||||||
// in this case we know the fragment needs an update
|
// in this case we know the fragment needs an update
|
||||||
_fuid_save_queue.push({Message::getTimeMS(), fragment_uid});
|
_fuid_save_queue.push({Message::getTimeMS(), fragment_uid, m.registry()});
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: do we use fid?
|
// TODO: do we use fid?
|
||||||
@ -140,6 +175,18 @@ MessageFragmentStore::MessageFragmentStore(
|
|||||||
_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
|
_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
|
||||||
|
|
||||||
_fs._sc.registerSerializerJson<FragComp::MessagesTSRange>(serl_json_msg_ts_range);
|
_fs._sc.registerSerializerJson<FragComp::MessagesTSRange>(serl_json_msg_ts_range);
|
||||||
|
|
||||||
|
_sc.registerSerializerJson<Message::Components::Timestamp>(serl_json_default<Message::Components::Timestamp>);
|
||||||
|
_sc.registerSerializerJson<Message::Components::ContactFrom>(serl_json_default<Message::Components::ContactFrom>);
|
||||||
|
_sc.registerSerializerJson<Message::Components::ContactTo>(serl_json_default<Message::Components::ContactTo>);
|
||||||
|
_sc.registerSerializerJson<Message::Components::MessageText>(serl_json_default<Message::Components::MessageText>);
|
||||||
|
//_sc.registerSerializerJson<Message::Components::TagMessageIsAction>(serl_json_default<);
|
||||||
|
|
||||||
|
// files
|
||||||
|
//_sc.registerSerializerJson<Message::Components::Transfer::FileID>()
|
||||||
|
//_sc.registerSerializerJson<Message::Components::Transfer::FileInfo>();
|
||||||
|
//_sc.registerSerializerJson<Message::Components::Transfer::FileInfoLocal>();
|
||||||
|
//_sc.registerSerializerJson<Message::Components::Transfer::TagHaveAll>();
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageFragmentStore::~MessageFragmentStore(void) {
|
MessageFragmentStore::~MessageFragmentStore(void) {
|
||||||
@ -150,7 +197,42 @@ float MessageFragmentStore::tick(float time_delta) {
|
|||||||
|
|
||||||
if (!_fuid_save_queue.empty()) {
|
if (!_fuid_save_queue.empty()) {
|
||||||
const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id);
|
const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id);
|
||||||
auto j = nlohmann::json::object();
|
auto* reg = _fuid_save_queue.front().reg;
|
||||||
|
assert(reg != nullptr);
|
||||||
|
auto j = nlohmann::json::array();
|
||||||
|
|
||||||
|
// TODO: does every message have ts?
|
||||||
|
auto msg_view = reg->view<Message::Components::Timestamp>();
|
||||||
|
// we also assume all messages have fuid (hack: call handle when not?)
|
||||||
|
for (const Message3 m : msg_view) {
|
||||||
|
if (!reg->all_of<Message::Components::FUID, Message::Components::ContactFrom, Message::Components::ContactTo>(m)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!reg->any_of<Message::Components::MessageText, Message::Components::Transfer::FileInfo>(m)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& j_entry = j.emplace_back(nlohmann::json::object());
|
||||||
|
|
||||||
|
for (const auto& [type_id, storage] : reg->storage()) {
|
||||||
|
if (!storage.contains(m)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n";
|
||||||
|
|
||||||
|
// use type_id to find serializer
|
||||||
|
auto s_cb_it = _sc._serl_json.find(type_id);
|
||||||
|
if (s_cb_it == _sc._serl_json.end()) {
|
||||||
|
// could not find serializer, not saving
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
s_cb_it->second(storage.value(m), j_entry[storage.type().name()]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// if save as binary
|
// if save as binary
|
||||||
//nlohmann::json::to_msgpack(j);
|
//nlohmann::json::to_msgpack(j);
|
||||||
auto j_dump = j.dump(2, ' ', true);
|
auto j_dump = j.dump(2, ' ', true);
|
||||||
|
@ -40,20 +40,15 @@ class MessageFragmentStore : public RegistryMessageModelEventI {
|
|||||||
RegistryMessageModel& _rmm;
|
RegistryMessageModel& _rmm;
|
||||||
FragmentStore& _fs;
|
FragmentStore& _fs;
|
||||||
|
|
||||||
void handleMessage(const Message3Handle& m);
|
// for message components only
|
||||||
|
SerializerCallbacks _sc;
|
||||||
|
|
||||||
struct OpenFrag final {
|
void handleMessage(const Message3Handle& m);
|
||||||
uint64_t ts_begin {0};
|
|
||||||
uint64_t ts_end {0};
|
|
||||||
std::vector<uint8_t> uid;
|
|
||||||
};
|
|
||||||
// only contains fragments with <1024 messages and <28h tsrage
|
|
||||||
// TODO: this needs to move into the message reg
|
|
||||||
std::vector<OpenFrag> _fuid_open;
|
|
||||||
|
|
||||||
struct QueueEntry final {
|
struct QueueEntry final {
|
||||||
uint64_t ts_since_dirty{0};
|
uint64_t ts_since_dirty{0};
|
||||||
std::vector<uint8_t> id;
|
std::vector<uint8_t> id;
|
||||||
|
Message3Registry* reg{nullptr};
|
||||||
};
|
};
|
||||||
std::queue<QueueEntry> _fuid_save_queue;
|
std::queue<QueueEntry> _fuid_save_queue;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user