diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5edd7c8..8fc3573 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -64,12 +64,13 @@ target_link_libraries(message_fragment_store PUBLIC ######################################## -add_executable(convert_frag_to_obj +add_executable(convert_message_object_store fragment_store/convert_frag_to_obj.cpp ) -target_link_libraries(convert_frag_to_obj PUBLIC +target_link_libraries(convert_message_object_store PUBLIC fragment_store + message_fragment_store ) ######################################## diff --git a/src/fragment_store/convert_frag_to_obj.cpp b/src/fragment_store/convert_frag_to_obj.cpp index eccc9e9..41a69bd 100644 --- a/src/fragment_store/convert_frag_to_obj.cpp +++ b/src/fragment_store/convert_frag_to_obj.cpp @@ -2,6 +2,7 @@ #include "./backends/filesystem_storage.hpp" #include "./meta_components.hpp" #include "./serializer_json.hpp" +#include "./message_fragment_store.hpp" #include @@ -31,6 +32,13 @@ int main(int argc, const char** argv) { backend::FilesystemStorage fsb_src(os_src, argv[1]); backend::FilesystemStorage fsb_dst(os_dst, argv[2]); + Contact3Registry cr; // dummy + RegistryMessageModel rmm(cr); // dummy + // they only exist for the serializers (for now) + // TODO: version + MessageFragmentStore mfs_src(cr, rmm, os_src, fsb_src); + MessageFragmentStore mfs_dst(cr, rmm, os_dst, fsb_dst); + // add message fragment store too (adds meta?) // hookup events diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index c44b511..85f47b4 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -71,6 +71,7 @@ namespace Message::Components { } // Message::Components namespace ObjectStore::Components { + NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesVersion, v) NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesTSRange, begin, end) NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesContact, id) @@ -228,6 +229,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { fh.emplace_or_replace().comp = Compression::ZSTD; fh.emplace_or_replace().comp = Compression::ZSTD; + fh.emplace_or_replace(); // default is current auto& new_ts_range = fh.emplace_or_replace(); new_ts_range.begin = msg_ts; @@ -464,7 +466,11 @@ bool MessageFragmentStore::syncFragToStorage(ObjectHandle fh, Message3Registry& continue; } - s_cb_it->second(_sc, {reg, m}, j_entry[storage.type().name()]); + try { + s_cb_it->second(_sc, {reg, m}, j_entry[storage.type().name()]); + } catch (...) { + std::cerr << "MFS error: failed to serialize " << storage.type().name() << "(" << type_id << ")\n"; + } } } @@ -502,12 +508,14 @@ MessageFragmentStore::MessageFragmentStore( _rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); auto& sjc = _os.registry().ctx().get>(); + sjc.registerSerializer(); + sjc.registerDeSerializer(); sjc.registerSerializer(); sjc.registerDeSerializer(); sjc.registerSerializer(); sjc.registerDeSerializer(); - // old + // old frag names sjc.registerSerializer(sjc.component_get_json); sjc.registerDeSerializer(sjc.component_emplace_or_replace_json); sjc.registerSerializer(sjc.component_get_json); @@ -625,6 +633,15 @@ float MessageFragmentStore::tick(float) { return 0.05f; } + if (!fh.all_of()) { + // missing version, adding + fh.emplace(); + } + if (fh.get().v != 1) { + std::cerr << "MFS: object with version mismatch\n"; + return 0.05f; + } + // get ts range of frag and collide with all curser(s/ranges) const auto& frag_range = fh.get(); @@ -837,6 +854,11 @@ bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectConstruct& e if (!e.e.all_of()) { return false; // not for us } + if (!e.e.all_of()) { + // missing version, adding + // version check is later + e.e.emplace(); + } // TODO: are we sure it is a *new* fragment? diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index 91f355d..b23d054 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -55,6 +55,12 @@ namespace Message::Components { } // Message::Components namespace ObjectStore::Components { + struct MessagesVersion { + // messages Object version + // 1 -> text_json + uint16_t v {1}; + }; + struct MessagesTSRange { // timestamp range within the fragment uint64_t begin {0}; // newer msg -> higher number