commit 715b7cfa58e82ac3ceb45bb2b4bb726826150dd4 Author: Green Sky Date: Sun Apr 14 13:12:25 2024 +0200 mfs with plugin, wip bc plugin provides the backend (hardcoded) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..56f48bf --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +.vs/ +*.o +*.swp +~* +*~ +.idea/ +cmake-build-debug/ +cmake-build-debugandtest/ +cmake-build-release/ +*.stackdump +*.coredump +compile_commands.json +/build* +/result* +.clangd +.cache + +.DS_Store +.AppleDouble +.LSOverride + +CMakeLists.txt.user* +CMakeCache.txt + +*.tox +imgui.ini diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..f6bd8f3 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,72 @@ +cmake_minimum_required(VERSION 3.24 FATAL_ERROR) + +# cmake setup begin +project(solanaceae_message_fragment_store) + +if (CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR) + set(SOLANACEAE_MESSAGE_FRAGMENT_STORE_STANDALONE ON) +else() + set(SOLANACEAE_MESSAGE_FRAGMENT_STORE_STANDALONE OFF) +endif() +message("II SOLANACEAE_MESSAGE_FRAGMENT_STORE_STANDALONE " ${SOLANACEAE_MESSAGE_FRAGMENT_STORE_STANDALONE}) + +option(SOLANACEAE_MESSAGE_FRAGMENT_STORE_BUILD_PLUGINS "Build the solanaceae_message_fragment_store plugins" ${SOLANACEAE_MESSAGE_FRAGMENT_STORE_STANDALONE}) + +if (SOLANACEAE_MESSAGE_FRAGMENT_STORE_STANDALONE) + set(CMAKE_POSITION_INDEPENDENT_CODE ON) + + # defaulting to debug mode, if not specified + if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE "Debug") + endif() + + # setup my vim ycm :D + set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + + # more paths + set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib") + set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin") + set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin") +endif() + +# external libs +add_subdirectory(./external EXCLUDE_FROM_ALL) # before increasing warn levels, sad :( + +if (SOLANACEAE_MESSAGE_FRAGMENT_STORE_STANDALONE) + set(CMAKE_CXX_EXTENSIONS OFF) + + # bump up warning levels appropriately for clang, gcc & msvc + if (${CMAKE_CXX_COMPILER_ID} STREQUAL "GNU" OR ${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang") + add_compile_options( + -Wall -Wextra # Reasonable and standard + -Wpedantic # Warn if non-standard C++ is used + -Wunused # Warn on anything being unused + #-Wconversion # Warn on type conversions that may lose data + #-Wsign-conversion # Warn on sign conversions + -Wshadow # Warn if a variable declaration shadows one from a parent context + ) + + if (NOT WIN32) + #link_libraries(-fsanitize=address) + #link_libraries(-fsanitize=address,undefined) + #link_libraries(-fsanitize-address-use-after-scope) + #link_libraries(-fsanitize=undefined) + endif() + elseif (${CMAKE_CXX_COMPILER_ID} STREQUAL "MSVC") + if (CMAKE_CXX_FLAGS MATCHES "/W[0-4]") + string(REGEX REPLACE "/W[0-4]" "/W4" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}") + else() + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W4") + endif() + endif() + +endif() + +# cmake setup end + +add_subdirectory(./src) + +if (SOLANACEAE_MESSAGE_FRAGMENT_STORE_BUILD_PLUGINS) + add_subdirectory(./plugins) +endif() + diff --git a/external/CMakeLists.txt b/external/CMakeLists.txt new file mode 100644 index 0000000..2e3b5cc --- /dev/null +++ b/external/CMakeLists.txt @@ -0,0 +1,61 @@ +cmake_minimum_required(VERSION 3.24 FATAL_ERROR) + +include(FetchContent) + +if (NOT TARGET solanaceae_util) + FetchContent_Declare(solanaceae_util + GIT_REPOSITORY https://github.com/Green-Sky/solanaceae_util.git + GIT_TAG master + ) + FetchContent_MakeAvailable(solanaceae_util) +endif() + +if (NOT TARGET solanaceae_plugin) + FetchContent_Declare(solanaceae_plugin + GIT_REPOSITORY https://github.com/Green-Sky/solanaceae_plugin.git + GIT_TAG master + ) + FetchContent_MakeAvailable(solanaceae_plugin) +endif() + +if (NOT TARGET solanaceae_contact) + FetchContent_Declare(solanaceae_contact + GIT_REPOSITORY https://github.com/Green-Sky/solanaceae_contact.git + GIT_TAG master + ) + FetchContent_MakeAvailable(solanaceae_contact) +endif() + +if (NOT TARGET solanaceae_message3) + FetchContent_Declare(solanaceae_message3 + GIT_REPOSITORY https://github.com/Green-Sky/solanaceae_message3.git + GIT_TAG master + ) + FetchContent_MakeAvailable(solanaceae_message3) +endif() + +if (NOT TARGET solanaceae_message_serializer) + FetchContent_Declare(solanaceae_message_serializer + GIT_REPOSITORY https://github.com/Green-Sky/solanaceae_message_serializer.git + GIT_TAG master + ) + FetchContent_MakeAvailable(solanaceae_message_serializer) +endif() + +if (NOT TARGET solanaceae_object_store) + FetchContent_Declare(solanaceae_object_store + GIT_REPOSITORY https://github.com/Green-Sky/solanaceae_object_store.git + GIT_TAG master + ) + FetchContent_MakeAvailable(solanaceae_object_store) +endif() + +if (NOT TARGET nlohmann_json::nlohmann_json) + FetchContent_Declare(json + URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz + URL_HASH SHA256=d6c65aca6b1ed68e7a182f4757257b107ae403032760ed6ef121c9d55e81757d + EXCLUDE_FROM_ALL + ) + FetchContent_MakeAvailable(json) +endif() + diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt new file mode 100644 index 0000000..8aaf976 --- /dev/null +++ b/plugins/CMakeLists.txt @@ -0,0 +1,14 @@ +cmake_minimum_required(VERSION 3.9...3.24 FATAL_ERROR) + +######################################## + +add_library(plugin_mfs_wip SHARED + ./plugin_mfs_wip.cpp +) +target_compile_features(plugin_mfs_wip PUBLIC cxx_std_17) +target_link_libraries(plugin_mfs_wip PUBLIC + solanaceae_plugin + solanaceae_message_fragment_store + solanaceae_object_store_backend_filesystem +) + diff --git a/plugins/plugin_mfs_wip.cpp b/plugins/plugin_mfs_wip.cpp new file mode 100644 index 0000000..602f1f9 --- /dev/null +++ b/plugins/plugin_mfs_wip.cpp @@ -0,0 +1,73 @@ +#include +#include + +#include +#include +#include + +#include +#include +#include + +static std::unique_ptr g_mfs = nullptr; +static std::unique_ptr g_fsb = nullptr; + +constexpr const char* plugin_name = "MessageFragmentStore"; + +extern "C" { + +SOLANA_PLUGIN_EXPORT const char* solana_plugin_get_name(void) { + return plugin_name; +} + +SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_get_version(void) { + return SOLANA_PLUGIN_VERSION; +} + +SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api) { + std::cout << "PLUGIN " << plugin_name << " START()\n"; + + if (solana_api == nullptr) { + return 1; + } + + try { + auto* cr = PLUG_RESOLVE_INSTANCE_VERSIONED(Contact3Registry, "1"); + auto* rmm = PLUG_RESOLVE_INSTANCE(RegistryMessageModel); + auto* os = PLUG_RESOLVE_INSTANCE(ObjectStore2); + auto* msnj = PLUG_RESOLVE_INSTANCE(MessageSerializerNJ); + + // static store, could be anywhere tho + // construct with fetched dependencies + g_fsb = std::make_unique(*os, "test2_message_store/"); // TODO: use config? + g_mfs = std::make_unique(*cr, *rmm, *os, *g_fsb, *msnj); + + // register types + PLUG_PROVIDE_INSTANCE(MessageFragmentStore, plugin_name, g_mfs.get()); + } catch (const ResolveException& e) { + std::cerr << "PLUGIN " << plugin_name << " " << e.what << "\n"; + return 2; + } + + return 0; +} + +SOLANA_PLUGIN_EXPORT void solana_plugin_stop(void) { + std::cout << "PLUGIN " << plugin_name << " STOP()\n"; + + g_fsb.reset(); + g_mfs.reset(); +} + +SOLANA_PLUGIN_EXPORT float solana_plugin_tick(float time_delta) { + static bool scan_triggered {false}; + if (!scan_triggered) { + g_fsb->scanAsync(); + scan_triggered = true; + } + + return g_mfs->tick(time_delta); +} + +} // extern C + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..5adca01 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.9...3.24 FATAL_ERROR) + +project(solanaceae) + +add_library(solanaceae_message_fragment_store + ./solanaceae/message_fragment_store/meta_messages_components.hpp + ./solanaceae/message_fragment_store/meta_messages_components_id.inl + ./solanaceae/message_fragment_store/internal_mfs_contexts.hpp + ./solanaceae/message_fragment_store/internal_mfs_contexts.cpp + ./solanaceae/message_fragment_store/message_fragment_store.hpp + ./solanaceae/message_fragment_store/message_fragment_store.cpp +) + +target_include_directories(solanaceae_message_fragment_store PUBLIC .) +target_compile_features(solanaceae_message_fragment_store PUBLIC cxx_std_17) +target_link_libraries(solanaceae_message_fragment_store PUBLIC + solanaceae_util + solanaceae_message3 + solanaceae_message_serializer + solanaceae_object_store + nlohmann_json::nlohmann_json +) + +######################################## + diff --git a/src/solanaceae/message_fragment_store/internal_mfs_contexts.cpp b/src/solanaceae/message_fragment_store/internal_mfs_contexts.cpp new file mode 100644 index 0000000..cadb952 --- /dev/null +++ b/src/solanaceae/message_fragment_store/internal_mfs_contexts.cpp @@ -0,0 +1,149 @@ +#include "./internal_mfs_contexts.hpp" + +#include "./message_fragment_store.hpp" + +#include +#include +#include + +#include + +static bool isLess(const std::vector& lhs, const std::vector& rhs) { + size_t i = 0; + for (; i < lhs.size() && i < rhs.size(); i++) { + if (lhs[i] < rhs[i]) { + return true; + } else if (lhs[i] > rhs[i]) { + return false; + } + // else continue + } + + // here we have equality of common lenths + + // we define smaller arrays to be less + return lhs.size() < rhs.size(); +} + +bool Message::Contexts::ContactFragments::insert(ObjectHandle frag) { + if (sorted_frags.contains(frag)) { + return false; + } + + // both sorted arrays are sorted ascending + // so for insertion we search for the last index that is <= and insert after it + // or we search for the first > (or end) and insert before it <--- + // since equal fragments are UB, we can assume they are only > or < + + size_t begin_index {0}; + { // begin + const auto pos = std::find_if( + sorted_begin.cbegin(), + sorted_begin.cend(), + [frag](const Object a) -> bool { + const auto begin_a = frag.registry()->get(a).begin; + const auto begin_frag = frag.get().begin; + if (begin_a > begin_frag) { + return true; + } else if (begin_a < begin_frag) { + return false; + } else { + // equal ts, we need to fall back to id (id can not be equal) + return isLess(frag.get().v, frag.registry()->get(a).v); + } + } + ); + + begin_index = std::distance(sorted_begin.cbegin(), pos); + + // we need to insert before pos (end is valid here) + sorted_begin.insert(pos, frag); + } + + size_t end_index {0}; + { // end + const auto pos = std::find_if_not( + sorted_end.cbegin(), + sorted_end.cend(), + [frag](const Object a) -> bool { + const auto end_a = frag.registry()->get(a).end; + const auto end_frag = frag.get().end; + if (end_a > end_frag) { + return true; + } else if (end_a < end_frag) { + return false; + } else { + // equal ts, we need to fall back to id (id can not be equal) + return isLess(frag.get().v, frag.registry()->get(a).v); + } + } + ); + + end_index = std::distance(sorted_end.cbegin(), pos); + + // we need to insert before pos (end is valid here) + sorted_end.insert(pos, frag); + } + + sorted_frags.emplace(frag, InternalEntry{begin_index, end_index}); + + // now adjust all indicies of fragments coming after the insert position + for (size_t i = begin_index + 1; i < sorted_begin.size(); i++) { + sorted_frags.at(sorted_begin[i]).i_b = i; + } + for (size_t i = end_index + 1; i < sorted_end.size(); i++) { + sorted_frags.at(sorted_end[i]).i_e = i; + } + + return true; +} + +bool Message::Contexts::ContactFragments::erase(Object frag) { + auto frags_it = sorted_frags.find(frag); + if (frags_it == sorted_frags.end()) { + return false; + } + + assert(sorted_begin.size() == sorted_end.size()); + assert(sorted_begin.size() > frags_it->second.i_b); + + sorted_begin.erase(sorted_begin.begin() + frags_it->second.i_b); + sorted_end.erase(sorted_end.begin() + frags_it->second.i_e); + + sorted_frags.erase(frags_it); + + return true; +} + +Object Message::Contexts::ContactFragments::prev(Object frag) const { + // uses range begin to go back in time + + auto it = sorted_frags.find(frag); + if (it == sorted_frags.end()) { + return entt::null; + } + + const auto src_i = it->second.i_b; + if (src_i > 0) { + return sorted_begin[src_i-1]; + } + + return entt::null; +} + +Object Message::Contexts::ContactFragments::next(Object frag) const { + // uses range end to go forward in time + + auto it = sorted_frags.find(frag); + if (it == sorted_frags.end()) { + return entt::null; + } + + const auto src_i = it->second.i_e; + if (src_i+1 < sorted_end.size()) { + return sorted_end[src_i+1]; + } + + return entt::null; +} + diff --git a/src/solanaceae/message_fragment_store/internal_mfs_contexts.hpp b/src/solanaceae/message_fragment_store/internal_mfs_contexts.hpp new file mode 100644 index 0000000..4283de8 --- /dev/null +++ b/src/solanaceae/message_fragment_store/internal_mfs_contexts.hpp @@ -0,0 +1,53 @@ +#pragma once + +#include + +#include +#include + +// everything assumes a single object registry (and unique objects) + +namespace Message::Contexts { + + // ctx + struct OpenFragments { + // only contains fragments with <1024 messages and <2h tsrage (or whatever) + entt::dense_set open_frags; + }; + + // all message fragments of this contact + struct ContactFragments final { + // kept up-to-date by events + struct InternalEntry { + // indecies into the sorted arrays + size_t i_b; + size_t i_e; + }; + entt::dense_map sorted_frags; + + // add 2 sorted contact lists for both range begin and end + // TODO: adding and removing becomes expensive with enough frags, consider splitting or heap + std::vector sorted_begin; + std::vector sorted_end; + + // api + // return true if it was actually inserted + bool insert(ObjectHandle frag); + bool erase(Object frag); + // update? (just erase() + insert()) + + // uses range begin to go back in time + Object prev(Object frag) const; + // uses range end to go forward in time + Object next(Object frag) const; + }; + + // all LOADED message fragments + // TODO: merge into ContactFragments (and pull in openfrags) + struct LoadedContactFragments final { + // kept up-to-date by events + entt::dense_set loaded_frags; + }; + +} // Message::Contexts + diff --git a/src/solanaceae/message_fragment_store/message_fragment_store.cpp b/src/solanaceae/message_fragment_store/message_fragment_store.cpp new file mode 100644 index 0000000..66d5094 --- /dev/null +++ b/src/solanaceae/message_fragment_store/message_fragment_store.cpp @@ -0,0 +1,922 @@ +#include "./message_fragment_store.hpp" + +#include "./internal_mfs_contexts.hpp" + +#include + +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +// https://youtu.be/CU2exyhYPfA + +// everything assumes a single object registry (and unique objects) + +namespace ObjectStore::Components { + NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesVersion, v) + NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesTSRange, begin, end) + NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesContact, id) + + namespace Ephemeral { + // does not contain any messges + // (recheck on frag update) + struct MessagesEmptyTag {}; + + // cache the contact for faster lookups + struct MessagesContactEntity { + Contact3 e {entt::null}; + }; + } +} // ObjectStore::Component + +static nlohmann::json loadFromStorageNJ(ObjectHandle oh) { + assert(oh.all_of()); + auto* backend = oh.get().ptr; + assert(backend != nullptr); + + std::vector tmp_buffer; + std::function cb = [&tmp_buffer](const ByteSpan buffer) { + tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend()); + }; + if (!backend->read(oh, cb)) { + std::cerr << "failed to read obj '" << bin2hex(oh.get().v) << "'\n"; + return false; + } + + const auto obj_version = oh.get().v; + + if (obj_version == 1) { + return nlohmann::json::parse(tmp_buffer, nullptr, false); + } else if (obj_version == 2) { + return nlohmann::json::from_msgpack(tmp_buffer, true, false); + } else { + assert(false); + return {}; + } +} + +void MessageFragmentStore::handleMessage(const Message3Handle& m) { + if (_fs_ignore_event) { + // message event because of us loading a fragment, ignore + // TODO: this barely makes a difference + return; + } + + if (!static_cast(m)) { + return; // huh? + } + + if (!m.all_of()) { + return; // we only handle msg with ts + } + + _potentially_dirty_contacts.emplace(m.registry()->ctx().get()); // always mark dirty here + if (m.any_of()) { + // not an actual message, but we probalby need to check and see if we need to load fragments + //std::cout << "MFS: new or updated curser\n"; + return; + } + + // TODO: this is bad, we need a non persistence tag instead + if (!m.any_of()) { + // skip everything else for now + return; + } + + // TODO: use fid, seving full fuid for every message consumes alot of memory (and heap frag) + if (!m.all_of()) { + std::cout << "MFS: new msg missing Object\n"; + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); + } + + auto& fid_open = m.registry()->ctx().get().open_frags; + + const auto msg_ts = m.get().ts; + // missing fuid + // find closesed non-sealed off fragment + + Object fragment_id{entt::null}; + + // first search for fragment where the ts falls into the range + for (const auto& fid : fid_open) { + auto fh = _os.objectHandle(fid); + assert(static_cast(fh)); + + // assuming ts range exists + auto& fts_comp = fh.get(); + + if (fts_comp.begin <= msg_ts && fts_comp.end >= msg_ts) { + fragment_id = fid; + // TODO: check conditions for open here + // TODO: mark msg (and frag?) dirty + } + } + + // if it did not fit into an existing fragment, we next look for fragments that could be extended + if (!_os._reg.valid(fragment_id)) { + for (const auto& fid : fid_open) { + auto fh = _os.objectHandle(fid); + assert(static_cast(fh)); + + // assuming ts range exists + auto& fts_comp = fh.get(); + + const int64_t frag_range = int64_t(fts_comp.end) - int64_t(fts_comp.begin); + constexpr static int64_t max_frag_ts_extent {1000*60*60}; + //constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing + const int64_t possible_extention = max_frag_ts_extent - frag_range; + + // which direction + if ((fts_comp.begin - possible_extention) <= msg_ts && fts_comp.begin > msg_ts) { + fragment_id = fid; + + std::cout << "MFS: extended begin from " << fts_comp.begin << " to " << msg_ts << "\n"; + + // assuming ts range exists + fts_comp.begin = msg_ts; // extend into the past + + if (m.registry()->ctx().contains()) { + // should be the case + m.registry()->ctx().get().erase(fh); + m.registry()->ctx().get().insert(fh); + } + + + // TODO: check conditions for open here + // TODO: mark msg (and frag?) dirty + } else if ((fts_comp.end + possible_extention) >= msg_ts && fts_comp.end < msg_ts) { + fragment_id = fid; + + std::cout << "MFS: extended end from " << fts_comp.end << " to " << msg_ts << "\n"; + + // assuming ts range exists + fts_comp.end = msg_ts; // extend into the future + + if (m.registry()->ctx().contains()) { + // should be the case + m.registry()->ctx().get().erase(fh); + m.registry()->ctx().get().insert(fh); + } + + // TODO: check conditions for open here + // TODO: mark msg (and frag?) dirty + } + } + } + + // if its still not found, we need a new fragment + if (!_os.registry().valid(fragment_id)) { + const auto new_uuid = _session_uuid_gen(); + _fs_ignore_event = true; + auto fh = _sb.newObject(ByteSpan{new_uuid}); + _fs_ignore_event = false; + if (!static_cast(fh)) { + std::cout << "MFS error: failed to create new object for message\n"; + return; + } + + fragment_id = fh; + + fh.emplace_or_replace().comp = Compression::ZSTD; + fh.emplace_or_replace().comp = Compression::ZSTD; + fh.emplace_or_replace(); // default is current + + auto& new_ts_range = fh.emplace_or_replace(); + new_ts_range.begin = msg_ts; + new_ts_range.end = msg_ts; + + { + const auto msg_reg_contact = m.registry()->ctx().get(); + if (_cr.all_of(msg_reg_contact)) { + fh.emplace(_cr.get(msg_reg_contact).data); + } else { + // ? rage quit? + } + } + + // contact frag + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); + } + m.registry()->ctx().get().insert(fh); + + // loaded contact frag + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); + } + m.registry()->ctx().get().loaded_frags.emplace(fh); + + fid_open.emplace(fragment_id); + + std::cout << "MFS: created new fragment " << bin2hex(fh.get().v) << "\n"; + + _fs_ignore_event = true; + _os.throwEventConstruct(fh); + _fs_ignore_event = false; + } + + // if this is still empty, something is very wrong and we exit here + if (!_os.registry().valid(fragment_id)) { + std::cout << "MFS error: failed to find/create fragment for message\n"; + return; + } + + m.emplace_or_replace(fragment_id); + + // in this case we know the fragment needs an update + for (const auto& it : _frag_save_queue) { + if (it.id == fragment_id) { + // already in queue + return; // done + } + } + _frag_save_queue.push_back({Message::getTimeMS(), {_os.registry(), fragment_id}, m.registry()}); + return; // done + } + + const auto msg_fh = _os.objectHandle(m.get().o); + if (!static_cast(msg_fh)) { + std::cerr << "MFS error: fid in message is invalid\n"; + return; // TODO: properly handle this case + } + + if (!m.registry()->ctx().contains()) { + m.registry()->ctx().emplace(); + } + + auto& fid_open = m.registry()->ctx().get().open_frags; + + if (fid_open.contains(msg_fh)) { + // TODO: dedup events + // TODO: cooldown per fragsave + _frag_save_queue.push_back({Message::getTimeMS(), msg_fh, m.registry()}); + return; + } + + // TODO: save updates to old fragments, but writing them to a new fragment that would overwrite on merge + // new fragment?, since we dont write to others fragments? + + + // on new message: assign fuid + // on new and update: mark as fragment dirty +} + +// assumes not loaded frag +// need update from frag +void MessageFragmentStore::loadFragment(Message3Registry& reg, ObjectHandle fh) { + std::cout << "MFS: loadFragment\n"; + // version HAS to be set, or we just fail + if (!fh.all_of()) { + std::cerr << "MFS error: nope, object without version, cant load\n"; + return; + } + + nlohmann::json j; + const auto obj_version = fh.get().v; + if (obj_version == 1 || obj_version == 2) { + j = loadFromStorageNJ(fh); // also handles version and json/msgpack + } else { + std::cerr << "MFS error: nope, object with unknown version, cant load\n"; + return; + } + + if (!j.is_array()) { + // wrong data + fh.emplace_or_replace(); + return; + } + + if (j.size() == 0) { + // empty array + fh.emplace_or_replace(); + return; + } + + // TODO: this should probably never be the case, since we already know here that it is a msg frag + if (!reg.ctx().contains()) { + reg.ctx().emplace(); + } + reg.ctx().get().insert(fh); + + // mark loaded + if (!reg.ctx().contains()) { + reg.ctx().emplace(); + } + reg.ctx().get().loaded_frags.emplace(fh); + + size_t messages_new_or_updated {0}; + for (const auto& j_entry : j) { + auto new_real_msg = Message3Handle{reg, reg.create()}; + // load into staging reg + for (const auto& [k, v] : j_entry.items()) { + //std::cout << "K:" << k << " V:" << v.dump() << "\n"; + const auto type_id = entt::hashed_string(k.data(), k.size()); + const auto deserl_fn_it = _scnj._deserl_json.find(type_id); + if (deserl_fn_it != _scnj._deserl_json.cend()) { + try { + if (!deserl_fn_it->second(_scnj, new_real_msg, v)) { + std::cerr << "MFS error: failed deserializing '" << k << "'\n"; + } + } catch(...) { + std::cerr << "MFS error: failed deserializing (threw) '" << k << "'\n"; + } + } else { + std::cerr << "MFS warning: missing deserializer for meta key '" << k << "'\n"; + } + } + + new_real_msg.emplace_or_replace(fh); + + // dup check (hacky, specific to protocols) + Message3 dup_msg {entt::null}; + { + // get comparator from contact + if (reg.ctx().contains()) { + const auto c = reg.ctx().get(); + if (_cr.all_of(c)) { + auto& comp = _cr.get(c).comp; + // walking EVERY existing message OOF + // this needs optimizing + for (const Message3 other_msg : reg.view()) { + if (other_msg == new_real_msg) { + continue; // skip self + } + + if (comp({reg, other_msg}, new_real_msg)) { + // dup + dup_msg = other_msg; + break; + } + } + } + } + } + + if (reg.valid(dup_msg)) { + // -> merge with preexisting (needs to be order independent) + // -> throw update + reg.destroy(new_real_msg); + //messages_new_or_updated++; // TODO: how do i know on merging, if data was useful + //_rmm.throwEventUpdate(reg, new_real_msg); + } else { + if (!new_real_msg.all_of()) { + // does not have needed components to be stand alone + reg.destroy(new_real_msg); + std::cerr << "MFS warning: message with missing basic compoments\n"; + continue; + } + + messages_new_or_updated++; + // -> throw create + _rmm.throwEventConstruct(reg, new_real_msg); + } + } + + if (messages_new_or_updated == 0) { + // useless frag + // TODO: unload? + fh.emplace_or_replace(); + } +} + +bool MessageFragmentStore::syncFragToStorage(ObjectHandle fh, Message3Registry& reg) { + auto& ftsrange = fh.get_or_emplace(Message::getTimeMS(), Message::getTimeMS()); + + auto j = nlohmann::json::array(); + + // TODO: does every message have ts? + auto msg_view = reg.view(); + // we also assume all messages have an associated object + for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) { + const Message3 m = *it; + + if (!reg.all_of(m)) { + continue; + } + + // filter: require msg for now + // this will be removed in the future + if (!reg.any_of(m)) { + continue; + } + + if (_frag_save_queue.front().id != reg.get(m).o) { + continue; // not ours + } + + { // potentially adjust tsrange (some external processes can change timestamps) + const auto msg_ts = msg_view.get(m).ts; + if (ftsrange.begin > msg_ts) { + ftsrange.begin = msg_ts; + } else if (ftsrange.end < msg_ts) { + ftsrange.end = msg_ts; + } + } + + auto& j_entry = j.emplace_back(nlohmann::json::object()); + + for (const auto& [type_id, storage] : reg.storage()) { + if (!storage.contains(m)) { + continue; + } + + //std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n"; + + // use type_id to find serializer + auto s_cb_it = _scnj._serl_json.find(type_id); + if (s_cb_it == _scnj._serl_json.end()) { + // could not find serializer, not saving + //std::cout << "missing " << storage.type().name() << "(" << type_id << ")\n"; + continue; + } + + try { + s_cb_it->second(_scnj, {reg, m}, j_entry[storage.type().name()]); + } catch (...) { + std::cerr << "MFS error: failed to serialize " << storage.type().name() << "(" << type_id << ")\n"; + } + } + } + + // we cant skip if array is empty (in theory it will not be empty later on) + + std::vector data_to_save; + const auto obj_version = fh.get_or_emplace().v; + if (obj_version == 1) { + auto j_dump = j.dump(2, ' ', true); + data_to_save = std::vector(j_dump.cbegin(), j_dump.cend()); + } else if (obj_version == 2) { + data_to_save = nlohmann::json::to_msgpack(j); + } else { + std::cerr << "MFS error: unknown object version\n"; + assert(false); + } + assert(fh.all_of()); + auto* backend = fh.get().ptr; + if (backend->write(fh, {reinterpret_cast(data_to_save.data()), data_to_save.size()})) { + // TODO: make this better, should this be called on fail? should this be called before sync? (prob not) + _fs_ignore_event = true; + _os.throwEventUpdate(fh); + _fs_ignore_event = false; + + //std::cout << "MFS: dumped " << j_dump << "\n"; + // succ + return true; + } + + // TODO: error + return false; +} + +MessageFragmentStore::MessageFragmentStore( + Contact3Registry& cr, + RegistryMessageModel& rmm, + ObjectStore2& os, + StorageBackendI& sb, + MessageSerializerNJ& scnj +) : _cr(cr), _rmm(rmm), _os(os), _sb(sb), _scnj(scnj) { + _rmm.subscribe(this, RegistryMessageModel_Event::message_construct); + _rmm.subscribe(this, RegistryMessageModel_Event::message_updated); + _rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); + + // TODO: move somewhere else? + auto& sjc = _os.registry().ctx().get>(); + sjc.registerSerializer(); + sjc.registerDeSerializer(); + sjc.registerSerializer(); + sjc.registerDeSerializer(); + sjc.registerSerializer(); + sjc.registerDeSerializer(); + + // old frag names + sjc.registerSerializer(sjc.component_get_json); + sjc.registerDeSerializer(sjc.component_emplace_or_replace_json); + sjc.registerSerializer(sjc.component_get_json); + sjc.registerDeSerializer(sjc.component_emplace_or_replace_json); + + _os.subscribe(this, ObjectStore_Event::object_construct); + _os.subscribe(this, ObjectStore_Event::object_update); +} + +MessageFragmentStore::~MessageFragmentStore(void) { + while (!_frag_save_queue.empty()) { + auto fh = _frag_save_queue.front().id; + auto* reg = _frag_save_queue.front().reg; + assert(reg != nullptr); + syncFragToStorage(fh, *reg); + _frag_save_queue.pop_front(); // pop unconditionally + } +} + +// checks range against all cursers in msgreg +static bool rangeVisible(uint64_t range_begin, uint64_t range_end, const Message3Registry& msg_reg) { + // 1D collision checks: + // - for range vs range: + // r1 rhs >= r0 lhs AND r1 lhs <= r0 rhs + // - for range vs point: + // p >= r0 lhs AND p <= r0 rhs + // NOTE: directions for us are reversed (begin has larger values as end) + + auto c_b_view = msg_reg.view(); + c_b_view.use(); + for (const auto& [m, ts_begin_comp, vcb] : c_b_view.each()) { + // p and r1 rhs can be seen as the same + // but first we need to know if a curser begin is a point or a range + + // TODO: margin? + auto ts_begin = ts_begin_comp.ts; + auto ts_end = ts_begin_comp.ts; // simplyfy code by making a single begin curser act as an infinitly small range + if (msg_reg.valid(vcb.curser_end) && msg_reg.all_of(vcb.curser_end)) { + // TODO: respect curser end's begin? + // TODO: remember which ends we checked and check remaining + ts_end = msg_reg.get(vcb.curser_end).ts; + + // sanity check curser order + if (ts_end > ts_begin) { + std::cerr << "MFS warning: begin curser and end curser of view swapped!!\n"; + std::swap(ts_begin, ts_end); + } + } + + // perform both checks here + if (ts_begin < range_end || ts_end > range_begin) { + continue; + } + + // range hits a view + return true; + } + + return false; +} + +float MessageFragmentStore::tick(float) { + const auto ts_now = Message::getTimeMS(); + // sync dirty fragments here + if (!_frag_save_queue.empty()) { + // wait 10sec before saving + if (_frag_save_queue.front().ts_since_dirty + 10*1000 <= ts_now) { + auto fh = _frag_save_queue.front().id; + auto* reg = _frag_save_queue.front().reg; + assert(reg != nullptr); + if (syncFragToStorage(fh, *reg)) { + _frag_save_queue.pop_front(); + } + } + } + + // load needed fragments here + + // last check event frags + // only checks if it collides with ranges, not adjacent + // bc ~range~ msgreg will be marked dirty and checked next tick + const bool had_events = !_event_check_queue.empty(); + for (size_t i = 0; i < 10 && !_event_check_queue.empty(); i++) { + std::cout << "MFS: event check\n"; + auto fh = _event_check_queue.front().fid; + auto c = _event_check_queue.front().c; + _event_check_queue.pop_front(); + + if (!static_cast(fh)) { + return 0.05f; + } + + if (!fh.all_of()) { + return 0.05f; + } + + if (!fh.all_of()) { + // missing version, adding + fh.emplace(); + } + const auto object_version = fh.get().v; + // TODO: move this early version check somewhere else + if (object_version != 1 && object_version != 2) { + std::cerr << "MFS: object with version mismatch\n"; + return 0.05f; + } + + // get ts range of frag and collide with all curser(s/ranges) + const auto& frag_range = fh.get(); + + auto* msg_reg = _rmm.get(c); + if (msg_reg == nullptr) { + return 0.05f; + } + + if (rangeVisible(frag_range.begin, frag_range.end, !msg_reg)) { + loadFragment(*msg_reg, fh); + _potentially_dirty_contacts.emplace(c); + return 0.05f; // only one but soon again + } + } + if (had_events) { + std::cout << "MFS: event check none\n"; + return 0.05f; // only check events, even if non where hit + } + + if (!_potentially_dirty_contacts.empty()) { + //std::cout << "MFS: pdc\n"; + // here we check if any view of said contact needs frag loading + // only once per tick tho + + // TODO: this makes order depend on internal order and is not fair + auto it = _potentially_dirty_contacts.cbegin(); + + auto* msg_reg = _rmm.get(*it); + + // first do collision check agains every contact associated fragment + // that is not already loaded !! + if (msg_reg->ctx().contains()) { + const auto& cf = msg_reg->ctx().get(); + if (!cf.sorted_frags.empty()) { + if (!msg_reg->ctx().contains()) { + msg_reg->ctx().emplace(); + } + const auto& loaded_frags = msg_reg->ctx().get().loaded_frags; + + for (const auto& [fid, si] : msg_reg->ctx().get().sorted_frags) { + if (loaded_frags.contains(fid)) { + continue; + } + + auto fh = _os.objectHandle(fid); + + if (!static_cast(fh)) { + std::cerr << "MFS error: frag is invalid\n"; + // WHAT + msg_reg->ctx().get().erase(fid); + return 0.05f; + } + + if (!fh.all_of()) { + std::cerr << "MFS error: frag has no range\n"; + // ???? + msg_reg->ctx().get().erase(fid); + return 0.05f; + } + + if (fh.all_of()) { + continue; // skip known empty + } + + // get ts range of frag and collide with all curser(s/ranges) + const auto& [range_begin, range_end] = fh.get(); + + if (rangeVisible(range_begin, range_end, *msg_reg)) { + std::cout << "MFS: frag hit by vis range\n"; + loadFragment(*msg_reg, fh); + return 0.05f; + } + } + // no new visible fragment + //std::cout << "MFS: no new frag directly visible\n"; + + // now, finally, check for adjecent fragments that need to be loaded + // we do this by finding the outermost fragment in a rage, and extend it by one + + // TODO: rewrite using some bounding range tree to perform collision checks !!! + // (this is now performing better, but still) + + + // for each view + auto c_b_view = msg_reg->view(); + c_b_view.use(); + for (const auto& [_, ts_begin_comp, vcb] : c_b_view.each()) { + // aka "scroll down" + { // find newest(-ish) frag in range + // or in reverse frag end <= range begin + + + // lower bound of frag end and range begin + const auto right = std::lower_bound( + cf.sorted_end.crbegin(), + cf.sorted_end.crend(), + ts_begin_comp.ts, + [&](const Object element, const auto& value) -> bool { + return _os.registry().get(element).end >= value; + } + ); + + Object next_frag{entt::null}; + if (right != cf.sorted_end.crend()) { + next_frag = cf.next(*right); + } + // we checked earlier that cf is not empty + if (!_os.registry().valid(next_frag)) { + // fall back to closest, cf is not empty + next_frag = cf.sorted_end.front(); + } + + // a single adjacent frag is often not enough + // only ok bc next is cheap + for (size_t i = 0; i < 5 && _os.registry().valid(next_frag); next_frag = cf.next(next_frag)) { + auto fh = _os.objectHandle(next_frag); + if (fh.any_of()) { + continue; // skip known empty + } + + if (!loaded_frags.contains(next_frag)) { + std::cout << "MFS: next frag of range\n"; + loadFragment(*msg_reg, fh); + return 0.05f; + } + + i++; + } + } + + // curser end + if (!msg_reg->valid(vcb.curser_end) || !msg_reg->all_of(vcb.curser_end)) { + continue; + } + const auto ts_end = msg_reg->get(vcb.curser_end).ts; + + // aka "scroll up" + { // find oldest(-ish) frag in range + // frag begin >= range end + + // lower bound of frag begin and range end + const auto left = std::lower_bound( + cf.sorted_begin.cbegin(), + cf.sorted_begin.cend(), + ts_end, + [&](const Object element, const auto& value) -> bool { + return _os.registry().get(element).begin < value; + } + ); + + Object prev_frag{entt::null}; + if (left != cf.sorted_begin.cend()) { + prev_frag = cf.prev(*left); + } + // we checked earlier that cf is not empty + if (!_os.registry().valid(prev_frag)) { + // fall back to closest, cf is not empty + prev_frag = cf.sorted_begin.back(); + } + + // a single adjacent frag is often not enough + // only ok bc next is cheap + for (size_t i = 0; i < 5 && _os.registry().valid(prev_frag); prev_frag = cf.prev(prev_frag)) { + auto fh = _os.objectHandle(prev_frag); + if (fh.any_of()) { + continue; // skip known empty + } + + if (!loaded_frags.contains(prev_frag)) { + std::cout << "MFS: prev frag of range\n"; + loadFragment(*msg_reg, fh); + return 0.05f; + } + + i++; + } + } + } + } + } else { + // contact has no fragments, skip + } + + _potentially_dirty_contacts.erase(it); + + return 0.05f; + } + + + return 1000.f*60.f*60.f; +} + +bool MessageFragmentStore::onEvent(const Message::Events::MessageConstruct& e) { + handleMessage(e.e); + return false; +} + +bool MessageFragmentStore::onEvent(const Message::Events::MessageUpdated& e) { + handleMessage(e.e); + return false; +} + +// TODO: handle deletes? diff between unload? + +bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectConstruct& e) { + if (_fs_ignore_event) { + return false; // skip self + } + + if (!e.e.all_of()) { + return false; // not for us + } + if (!e.e.all_of()) { + // missing version, adding + // version check is later + e.e.emplace(); + } + + // TODO: are we sure it is a *new* fragment? + + Contact3 frag_contact = entt::null; + { // get contact + const auto& frag_contact_id = e.e.get().id; + // TODO: id lookup table, this is very inefficent + for (const auto& [c_it, id_it] : _cr.view().each()) { + if (frag_contact_id == id_it.data) { + frag_contact = c_it; + break; + } + } + if (!_cr.valid(frag_contact)) { + // unkown contact + return false; + } + e.e.emplace_or_replace(frag_contact); + } + + // create if not exist + auto* msg_reg = _rmm.get(frag_contact); + if (msg_reg == nullptr) { + // msg reg not created yet + // TODO: this is an erroious path + return false; + } + + if (!msg_reg->ctx().contains()) { + msg_reg->ctx().emplace(); + } + msg_reg->ctx().get().erase(e.e); // TODO: can this happen? update + msg_reg->ctx().get().insert(e.e); + + _event_check_queue.push_back(ECQueueEntry{e.e, frag_contact}); + + return false; +} + +bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectUpdate& e) { + if (_fs_ignore_event) { + return false; // skip self + } + + if (!e.e.all_of()) { + return false; // not for us + } + + // since its an update, we might have it associated, or not + // its also possible it was tagged as empty + e.e.remove(); + + Contact3 frag_contact = entt::null; + { // get contact + // probably cached already + if (e.e.all_of()) { + frag_contact = e.e.get().e; + } + + if (!_cr.valid(frag_contact)) { + const auto& frag_contact_id = e.e.get().id; + // TODO: id lookup table, this is very inefficent + for (const auto& [c_it, id_it] : _cr.view().each()) { + if (frag_contact_id == id_it.data) { + frag_contact = c_it; + break; + } + } + if (!_cr.valid(frag_contact)) { + // unkown contact + return false; + } + e.e.emplace_or_replace(frag_contact); + } + } + + // create if not exist + auto* msg_reg = _rmm.get(frag_contact); + if (msg_reg == nullptr) { + // msg reg not created yet + // TODO: this is an erroious path + return false; + } + + if (!msg_reg->ctx().contains()) { + msg_reg->ctx().emplace(); + } + msg_reg->ctx().get().erase(e.e); // TODO: check/update/fragment update + msg_reg->ctx().get().insert(e.e); + + // TODO: actually load it + //_event_check_queue.push_back(ECQueueEntry{e.e, frag_contact}); + + return false; +} + diff --git a/src/solanaceae/message_fragment_store/message_fragment_store.hpp b/src/solanaceae/message_fragment_store/message_fragment_store.hpp new file mode 100644 index 0000000..f31a4a1 --- /dev/null +++ b/src/solanaceae/message_fragment_store/message_fragment_store.hpp @@ -0,0 +1,107 @@ +#pragma once + +#include +#include + +#include + +#include + +#include "./meta_messages_components.hpp" + +#include +#include + +#include +#include + +#include +#include +#include + +namespace Message::Components { + + // unused, consumes too much memory (highly compressable) + //using FUID = FragComp::ID; + + struct Obj { + // message fragment's object + Object o {entt::null}; + }; + + // TODO: add adjacency range comp or inside curser + + // TODO: unused + // mfs will only load a limited number of fragments per tick (1), + // so this tag will be set if we loaded a fragment and + // every tick we check all cursers for this tag and continue + // and remove once no fragment could be loaded anymore + // (internal) + struct TagCurserUnsatisfied {}; + +} // Message::Components + +// handles fragments for messages +// on new message: assign fuid +// on new and update: mark as fragment dirty +// on delete: mark as fragment dirty? +class MessageFragmentStore : public RegistryMessageModelEventI, public ObjectStoreEventI { + public: + static constexpr const char* version {"3"}; + + protected: + Contact3Registry& _cr; + RegistryMessageModel& _rmm; + ObjectStore2& _os; + StorageBackendI& _sb; + MessageSerializerNJ& _scnj; + + bool _fs_ignore_event {false}; + UUIDGenerator_128_128 _session_uuid_gen; + + void handleMessage(const Message3Handle& m); + + void loadFragment(Message3Registry& reg, ObjectHandle oh); + + bool syncFragToStorage(ObjectHandle oh, Message3Registry& reg); + + struct SaveQueueEntry final { + uint64_t ts_since_dirty{0}; + //std::vector id; + ObjectHandle id; + Message3Registry* reg{nullptr}; + }; + std::deque _frag_save_queue; + + struct ECQueueEntry final { + ObjectHandle fid; + Contact3 c; + }; + std::deque _event_check_queue; + + // range changed or fragment loaded. + // we only load a limited number of fragments at once, + // so we need to keep them dirty until nothing was loaded. + entt::dense_set _potentially_dirty_contacts; + + public: + MessageFragmentStore( + Contact3Registry& cr, + RegistryMessageModel& rmm, + ObjectStore2& os, + StorageBackendI& sb, + MessageSerializerNJ& scnj + ); + virtual ~MessageFragmentStore(void); + + float tick(float time_delta); + + protected: // rmm + bool onEvent(const Message::Events::MessageConstruct& e) override; + bool onEvent(const Message::Events::MessageUpdated& e) override; + + protected: // fs + bool onEvent(const ObjectStore::Events::ObjectConstruct& e) override; + bool onEvent(const ObjectStore::Events::ObjectUpdate& e) override; +}; + diff --git a/src/solanaceae/message_fragment_store/meta_messages_components.hpp b/src/solanaceae/message_fragment_store/meta_messages_components.hpp new file mode 100644 index 0000000..d0847fb --- /dev/null +++ b/src/solanaceae/message_fragment_store/meta_messages_components.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include + +namespace ObjectStore::Components { + struct MessagesVersion { + // messages Object version + // 1 -> text_json + // 2 -> msgpack + uint16_t v {2}; + }; + + struct MessagesTSRange { + // timestamp range within the fragment + uint64_t begin {0}; // newer msg -> higher number + uint64_t end {0}; + }; + + struct MessagesContact { + std::vector id; + }; + + // TODO: add src contact (self id) + +} // ObjectStore::Components + +// old +namespace Fragment::Components { + struct MessagesTSRange : public ObjComp::MessagesTSRange {}; + struct MessagesContact : public ObjComp::MessagesContact {}; +} // Fragment::Components + +#include "./meta_messages_components.hpp" + diff --git a/src/solanaceae/message_fragment_store/meta_messages_components_id.inl b/src/solanaceae/message_fragment_store/meta_messages_components_id.inl new file mode 100644 index 0000000..9c46944 --- /dev/null +++ b/src/solanaceae/message_fragment_store/meta_messages_components_id.inl @@ -0,0 +1,31 @@ +#pragma once + +#include "./meta_messages_components.hpp" + +#include + +// TODO: move more central +#define DEFINE_COMP_ID(x) \ +template<> \ +constexpr entt::id_type entt::type_hash::value() noexcept { \ + using namespace entt::literals; \ + return #x##_hs; \ +} \ +template<> \ +constexpr std::string_view entt::type_name::value() noexcept { \ + return #x; \ +} + +// cross compiler stable ids + +DEFINE_COMP_ID(ObjComp::MessagesVersion) +DEFINE_COMP_ID(ObjComp::MessagesTSRange) +DEFINE_COMP_ID(ObjComp::MessagesContact) + +// old stuff +//DEFINE_COMP_ID(FragComp::MessagesTSRange) +//DEFINE_COMP_ID(FragComp::MessagesContact) + +#undef DEFINE_COMP_ID + +