diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ffd0531..5edd7c8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,13 +12,6 @@ add_library(fragment_store ./fragment_store/meta_components_id.inl ./fragment_store/file2_stack.hpp ./fragment_store/file2_stack.cpp - #old - ./fragment_store/serializer.hpp - ./fragment_store/fragment_store_i.hpp - ./fragment_store/fragment_store_i.cpp - ./fragment_store/fragment_store.hpp - ./fragment_store/fragment_store.cpp - #new ./fragment_store/serializer_json.hpp ./fragment_store/object_store.hpp ./fragment_store/object_store.cpp @@ -71,16 +64,6 @@ target_link_libraries(message_fragment_store PUBLIC ######################################## -add_executable(test_fragment_store - fragment_store/test_fragstore.cpp -) - -target_link_libraries(test_fragment_store PUBLIC - fragment_store -) - -######################################## - add_executable(convert_frag_to_obj fragment_store/convert_frag_to_obj.cpp ) diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp deleted file mode 100644 index 2a34930..0000000 --- a/src/fragment_store/fragment_store.cpp +++ /dev/null @@ -1,745 +0,0 @@ -#include "./fragment_store.hpp" - -#include - -#include -#include -#include - -#include - -#include -#include - -#include "./file2_stack.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -static const char* metaFileTypeSuffix(MetaFileType mft) { - switch (mft) { - case MetaFileType::TEXT_JSON: return ".json"; - //case MetaFileType::BINARY_ARB: return ".bin"; - case MetaFileType::BINARY_MSGPACK: return ".msgpack"; - } - return ""; // .unk? -} - -// TODO: move to ... somewhere. (span? file2i?) -static ByteSpan spanFromRead(const std::variant>& data_var) { - if (std::holds_alternative>(data_var)) { - auto& vec = std::get>(data_var); - return {vec.data(), vec.size()}; - } else if (std::holds_alternative(data_var)) { - return std::get(data_var); - } else { - assert(false); - return {}; - } -} - -FragmentStore::FragmentStore(void) { - registerSerializers(); -} - -FragmentStore::FragmentStore( - std::array session_uuid_namespace -) : _session_uuid_gen(std::move(session_uuid_namespace)) { - registerSerializers(); -} - -std::vector FragmentStore::generateNewUID(void) { - return _session_uuid_gen(); -} - -FragmentID FragmentStore::newFragmentMemoryOwned( - const std::vector& id, - size_t initial_size -) { - { // first check if id is already used - auto exising_id = getFragmentByID(id); - if (_reg.valid(exising_id)) { - return entt::null; - } - } - - { // next check if space in memory budget - const auto free_memory = _memory_budget - _memory_usage; - if (initial_size > free_memory) { - return entt::null; - } - } - - // actually allocate and create - auto new_data = std::make_unique>(initial_size); - if (!static_cast(new_data)) { - // allocation failure - return entt::null; - } - _memory_usage += initial_size; - - const auto new_frag = _reg.create(); - - _reg.emplace(new_frag, id); - // TODO: memory comp - _reg.emplace>>(new_frag) = std::move(new_data); - - throwEventConstruct(new_frag); - - return new_frag; -} - -FragmentID FragmentStore::newFragmentFile( - std::string_view store_path, - MetaFileType mft, - const std::vector& id -) { - { // first check if id is already used - const auto exising_id = getFragmentByID(id); - if (_reg.valid(exising_id)) { - return entt::null; - } - } - - if (store_path.empty()) { - store_path = _default_store_path; - } - - std::filesystem::create_directories(store_path); - - const auto id_hex = bin2hex(id); - std::filesystem::path fragment_file_path; - - if (id_hex.size() < 6) { - fragment_file_path = std::filesystem::path{store_path}/id_hex; - } else { - // use the first 2hex (1byte) as a subfolder - std::filesystem::create_directories(std::string{store_path} + id_hex.substr(0, 2)); - fragment_file_path = std::filesystem::path{std::string{store_path} + id_hex.substr(0, 2)} / id_hex.substr(2); - } - - if (std::filesystem::exists(fragment_file_path)) { - return entt::null; - } - - const auto new_frag = _reg.create(); - - _reg.emplace(new_frag, id); - - // file (info) comp - _reg.emplace(new_frag, fragment_file_path.generic_u8string()); - - _reg.emplace(new_frag, mft); - - // meta needs to be synced to file - std::function empty_data_cb = [](auto*, auto) -> uint64_t { return 0; }; - if (!syncToStorage(new_frag, empty_data_cb)) { - std::cerr << "FS error: syncToStorage failed while creating new fragment file\n"; - _reg.destroy(new_frag); - return entt::null; - } - - // while new metadata might be created here, making sure the file could be created is more important - throwEventConstruct(new_frag); - - return new_frag; -} -FragmentID FragmentStore::newFragmentFile( - std::string_view store_path, - MetaFileType mft -) { - return newFragmentFile(store_path, mft, generateNewUID()); -} - -FragmentID FragmentStore::getFragmentByID( - const std::vector& id -) { - // TODO: accelerate - // maybe keep it sorted and binary search? hash table lookup? - for (const auto& [frag, id_comp] : _reg.view().each()) { - if (id == id_comp.v) { - return frag; - } - } - - return entt::null; -} - -FragmentID FragmentStore::getFragmentCustomMatcher( - std::function& fn -) { - return entt::null; -} - -bool FragmentStore::syncToStorage(FragmentID fid, std::function& data_cb) { - if (!_reg.valid(fid)) { - return false; - } - - if (!_reg.all_of(fid)) { - // not a file fragment? - return false; - } - - // split object storage - - MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults - if (_reg.all_of(fid)) { - meta_type = _reg.get(fid).type; - } - - Encryption meta_enc = Encryption::NONE; // TODO: better defaults - Compression meta_comp = Compression::NONE; // TODO: better defaults - - if (meta_type != MetaFileType::TEXT_JSON) { - if (_reg.all_of(fid)) { - meta_enc = _reg.get(fid).enc; - } - - if (_reg.all_of(fid)) { - meta_comp = _reg.get(fid).comp; - } - } else { - // we cant have encryption or compression - // so we force NONE for TEXT JSON - - _reg.emplace_or_replace(fid, Encryption::NONE); - _reg.emplace_or_replace(fid, Compression::NONE); - } - - std::filesystem::path meta_tmp_path = _reg.get(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp"; - meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string()); - // TODO: make meta comp work with mem compressor - //auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp); - std::stack> meta_file_stack; - meta_file_stack.push(std::make_unique(std::string_view{meta_tmp_path.generic_u8string()})); - - if (meta_file_stack.empty()) { - std::cerr << "FS error: failed to create temporary meta file stack\n"; - std::filesystem::remove(meta_tmp_path); // might have created an empty file - return false; - } - - Encryption data_enc = Encryption::NONE; // TODO: better defaults - Compression data_comp = Compression::NONE; // TODO: better defaults - if (_reg.all_of(fid)) { - data_enc = _reg.get(fid).enc; - } - if (_reg.all_of(fid)) { - data_comp = _reg.get(fid).comp; - } - - std::filesystem::path data_tmp_path = _reg.get(fid).path + ".tmp"; - data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string()); - auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp); - if (data_file_stack.empty()) { - while (!meta_file_stack.empty()) { meta_file_stack.pop(); } - std::filesystem::remove(meta_tmp_path); - std::cerr << "FS error: failed to create temporary data file stack\n"; - return false; - } - - try { // TODO: properly sanitize strings, so this cant throw - // sharing code between binary msgpack and text json for now - nlohmann::json meta_data_j = nlohmann::json::object(); // metadata needs to be an object, null not allowed - // metadata file - - for (const auto& [type_id, storage] : _reg.storage()) { - if (!storage.contains(fid)) { - continue; - } - - //std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n"; - - // use type_id to find serializer - auto s_cb_it = _sc._serl_json.find(type_id); - if (s_cb_it == _sc._serl_json.end()) { - // could not find serializer, not saving - continue; - } - - // noooo, why cant numbers be keys - //if (meta_type == MetaFileType::BINARY_MSGPACK) { // msgpack uses the hash id instead - //s_cb_it->second(storage.value(fid), meta_data[storage.type().hash()]); - //} else if (meta_type == MetaFileType::TEXT_JSON) { - s_cb_it->second({_reg, fid}, meta_data_j[storage.type().name()]); - //} - } - - if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file - std::vector binary_meta_data; - { - std::stack> binary_writer_stack; - binary_writer_stack.push(std::make_unique(binary_meta_data)); - - if (!buildStackWrite(binary_writer_stack, meta_enc, meta_comp)) { - while (!meta_file_stack.empty()) { meta_file_stack.pop(); } - std::filesystem::remove(meta_tmp_path); - while (!data_file_stack.empty()) { data_file_stack.pop(); } - std::filesystem::remove(data_tmp_path); - std::cerr << "FS error: binary writer creation failed '" << _reg.get(fid).path << "'\n"; - return false; - } - - { - const std::vector meta_data = nlohmann::json::to_msgpack(meta_data_j); - if (!binary_writer_stack.top()->write(ByteSpan{meta_data})) { - // i feel like exceptions or refactoring would be nice here - while (!meta_file_stack.empty()) { meta_file_stack.pop(); } - std::filesystem::remove(meta_tmp_path); - while (!data_file_stack.empty()) { data_file_stack.pop(); } - std::filesystem::remove(data_tmp_path); - std::cerr << "FS error: binary writer failed '" << _reg.get(fid).path << "'\n"; - return false; - } - } - } - - //// the meta file is itself msgpack data - nlohmann::json meta_header_j = nlohmann::json::array(); - meta_header_j.emplace_back() = "SOLMET"; - meta_header_j.push_back(meta_enc); - meta_header_j.push_back(meta_comp); - - // with a custom msgpack impl like cmp, we can be smarter here and dont need an extra buffer - meta_header_j.push_back(nlohmann::json::binary(binary_meta_data)); - - const auto meta_header_data = nlohmann::json::to_msgpack(meta_header_j); - meta_file_stack.top()->write({meta_header_data.data(), meta_header_data.size()}); - } else if (meta_type == MetaFileType::TEXT_JSON) { - // cant be compressed or encrypted - const auto meta_file_json_str = meta_data_j.dump(2, ' ', true); - meta_file_stack.top()->write({reinterpret_cast(meta_file_json_str.data()), meta_file_json_str.size()}); - } - - } catch (...) { - while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? - std::filesystem::remove(meta_tmp_path); - while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? - std::filesystem::remove(data_tmp_path); - std::cerr << "FS error: failed to serialize json data\n"; - return false; - } - - // now data - // for zstd compression, chunk size is frame size. (no cross frame referencing) - // TODO: add buffering steam layer - static constexpr int64_t chunk_size{1024*1024}; // 1MiB should be enough - std::vector buffer(chunk_size); - uint64_t buffer_actual_size {0}; - do { - buffer_actual_size = data_cb(buffer.data(), buffer.size()); - if (buffer_actual_size == 0) { - break; - } - if (buffer_actual_size > buffer.size()) { - // wtf - break; - } - - data_file_stack.top()->write({buffer.data(), buffer_actual_size}); - } while (buffer_actual_size == buffer.size()); - - //meta_file.flush(); - //meta_file.close(); - while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? - //data_file.flush(); - //data_file.close(); - while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? - - std::filesystem::rename( - meta_tmp_path, - _reg.get(fid).path + ".meta" + metaFileTypeSuffix(meta_type) - ); - - std::filesystem::rename( - data_tmp_path, - _reg.get(fid).path - ); - - // TODO: check return value of renames - - if (_reg.all_of(fid)) { - _reg.remove(fid); - } - - return true; -} - -bool FragmentStore::syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size) { - std::function fn_cb = [read = 0ull, data, data_size](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t { - uint64_t i = 0; - for (; i+read < data_size && i < buffer_size; i++) { - request_buffer[i] = data[i+read]; - } - read += i; - - return i; - }; - return syncToStorage(fid, fn_cb); -} - -bool FragmentStore::loadFromStorage(FragmentID fid, std::function& data_cb) { - if (!_reg.valid(fid)) { - return false; - } - - if (!_reg.all_of(fid)) { - // not a file fragment? - // TODO: memory fragments - return false; - } - - const auto& frag_path = _reg.get(fid).path; - - // TODO: check if metadata dirty? - // TODO: what if file changed on disk? - - std::cout << "FS: loading fragment '" << frag_path << "'\n"; - - Compression data_comp = Compression::NONE; - if (_reg.all_of(fid)) { - data_comp = _reg.get(fid).comp; - } - - auto data_file_stack = buildFileStackRead(std::string_view{frag_path}, Encryption::NONE, data_comp); - if (data_file_stack.empty()) { - return false; - } - - // TODO: make it read in a single chunk instead? - static constexpr int64_t chunk_size {1024 * 1024}; // 1MiB should be good for read - do { - auto data_var = data_file_stack.top()->read(chunk_size); - ByteSpan data = spanFromRead(data_var); - - if (data.empty()) { - // error or probably eof - break; - } - - data_cb(data); - - if (data.size < chunk_size) { - // eof - break; - } - } while (data_file_stack.top()->isGood()); - - return true; -} - -nlohmann::json FragmentStore::loadFromStorageNJ(FragmentID fid) { - std::vector tmp_buffer; - std::function cb = [&tmp_buffer](const ByteSpan buffer) { - tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend()); - }; - - if (!loadFromStorage(fid, cb)) { - return nullptr; - } - - return nlohmann::json::parse(tmp_buffer); -} - -size_t FragmentStore::scanStoragePath(std::string_view path) { - if (path.empty()) { - path = _default_store_path; - } - // TODO: extract so async can work (or/and make iteratable generator) - - if (!std::filesystem::is_directory(path)) { - std::cerr << "FS error: scan path not a directory '" << path << "'\n"; - return 0; - } - - // step 1: make snapshot of files, validate metafiles and save id/path+meta.ext - // can be extra thread (if non vfs) - struct FragFileEntry { - std::string id_str; - std::filesystem::path frag_path; - std::string meta_ext; - - bool operator==(const FragFileEntry& other) const { - // only compare by id - return id_str == other.id_str; - } - }; - struct FragFileEntryHash { - size_t operator()(const FragFileEntry& it) const { - return entt::hashed_string(it.id_str.data(), it.id_str.size()); - } - }; - entt::dense_set file_frag_list; - - std::filesystem::path storage_path{path}; - - auto handle_file = [&](const std::filesystem::path& file_path) { - if (!std::filesystem::is_regular_file(file_path)) { - return; - } - // handle file - - if (file_path.has_extension()) { - // skip over metadata, assuming only metafiles have extentions (might be wrong?) - // also skips temps - return; - } - - auto relative_path = std::filesystem::proximate(file_path, storage_path); - std::string id_str = relative_path.generic_u8string(); - // delete all '/' - id_str.erase(std::remove(id_str.begin(), id_str.end(), '/'), id_str.end()); - if (id_str.size() % 2 != 0) { - std::cerr << "FS error: non hex fragment uid detected: '" << id_str << "'\n"; - } - - if (file_frag_list.contains(FragFileEntry{id_str, {}, ""})) { - std::cerr << "FS error: fragment duplicate detected: '" << id_str << "'\n"; - return; // skip - } - - const char* meta_ext = ".meta.msgpack"; - { // find meta - // TODO: this as to know all possible extentions - bool has_meta_msgpack = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.msgpack"); - bool has_meta_json = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.json"); - const size_t meta_sum = - (has_meta_msgpack?1:0) + - (has_meta_json?1:0) - ; - - if (meta_sum > 1) { // has multiple - std::cerr << "FS error: fragment with multiple meta files detected: " << id_str << "\n"; - return; // skip - } - - if (meta_sum == 0) { - std::cerr << "FS error: fragment missing meta file detected: " << id_str << "\n"; - return; // skip - } - - if (has_meta_json) { - meta_ext = ".meta.json"; - } - } - - file_frag_list.emplace(FragFileEntry{ - std::move(id_str), - file_path, - meta_ext - }); - }; - - for (const auto& outer_path : std::filesystem::directory_iterator(storage_path)) { - if (std::filesystem::is_regular_file(outer_path)) { - handle_file(outer_path); - } else if (std::filesystem::is_directory(outer_path)) { - // subdir, part of id - for (const auto& inner_path : std::filesystem::directory_iterator(outer_path)) { - //if (std::filesystem::is_regular_file(inner_path)) { - - //// handle file - //} // TODO: support deeper recursion? - handle_file(inner_path); - } - } - } - - std::cout << "FS: scan found:\n"; - for (const auto& it : file_frag_list) { - std::cout << " " << it.id_str << "\n"; - } - - // step 2: check if files preexist in reg - // main thread - // (merge into step 3 and do more error checking?) - for (auto it = file_frag_list.begin(); it != file_frag_list.end();) { - auto id = hex2bin(it->id_str); - auto fid = getFragmentByID(id); - if (_reg.valid(fid)) { - // pre exising (handle differently??) - // check if store differs? - it = file_frag_list.erase(it); - } else { - it++; - } - } - - std::vector scanned_frags; - // step 3: parse meta and insert into reg of non preexising - // main thread - // TODO: check timestamps of preexisting and reload? mark external/remote dirty? - for (const auto& it : file_frag_list) { - nlohmann::json j; - if (it.meta_ext == ".meta.msgpack") { - std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); - if (!file.is_open()) { - std::cout << "FS error: failed opening meta " << it.frag_path << "\n"; - continue; - } - - // file is a msgpack within a msgpack - - std::vector full_meta_data; - { // read meta file - // figure out size - file.seekg(0, file.end); - uint64_t file_size = file.tellg(); - file.seekg(0, file.beg); - - full_meta_data.resize(file_size); - - file.read(reinterpret_cast(full_meta_data.data()), full_meta_data.size()); - } - - const auto meta_header_j = nlohmann::json::from_msgpack(full_meta_data, true, false); - - if (!meta_header_j.is_array() || meta_header_j.size() < 4) { - std::cerr << "FS error: broken binary meta " << it.frag_path << "\n"; - continue; - } - - if (meta_header_j.at(0) != "SOLMET") { - std::cerr << "FS error: wrong magic '" << meta_header_j.at(0) << "' in meta " << it.frag_path << "\n"; - continue; - } - - Encryption meta_enc = meta_header_j.at(1); - if (meta_enc != Encryption::NONE) { - std::cerr << "FS error: unknown encryption " << it.frag_path << "\n"; - continue; - } - - Compression meta_comp = meta_header_j.at(2); - if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) { - std::cerr << "FS error: unknown compression " << it.frag_path << "\n"; - continue; - } - - //const auto& meta_data_ref = meta_header_j.at(3).is_binary()?meta_header_j.at(3):meta_header_j.at(3).at("data"); - if (!meta_header_j.at(3).is_binary()) { - std::cerr << "FS error: meta data not binary " << it.frag_path << "\n"; - continue; - } - const nlohmann::json::binary_t& meta_data_ref = meta_header_j.at(3); - - std::stack> binary_reader_stack; - binary_reader_stack.push(std::make_unique(ByteSpan{meta_data_ref.data(), meta_data_ref.size()})); - - if (!buildStackRead(binary_reader_stack, meta_enc, meta_comp)) { - std::cerr << "FS error: binary reader creation failed " << it.frag_path << "\n"; - continue; - } - - // HACK: read fixed amout of data, but this way if we have neither enc nor comp we pass the span through - auto binary_read_value = binary_reader_stack.top()->read(10*1024*1024); // is 10MiB large enough for meta? - const auto binary_read_span = spanFromRead(binary_read_value); - assert(binary_read_span.size < 10*1024*1024); - - j = nlohmann::json::from_msgpack(binary_read_span, true, false); - } else if (it.meta_ext == ".meta.json") { - std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); - if (!file.is_open()) { - std::cerr << "FS error: failed opening meta " << it.frag_path << "\n"; - continue; - } - - file >> j; - } else { - assert(false); - } - - if (!j.is_object()) { - std::cerr << "FS error: json in meta is broken " << it.id_str << "\n"; - continue; - } - - // TODO: existing fragment file - //newFragmentFile(); - FragmentHandle fh{_reg, _reg.create()}; - fh.emplace(hex2bin(it.id_str)); - - fh.emplace(it.frag_path.generic_u8string()); - - for (const auto& [k, v] : j.items()) { - // type id from string hash - const auto type_id = entt::hashed_string(k.data(), k.size()); - const auto deserl_fn_it = _sc._deserl_json.find(type_id); - if (deserl_fn_it != _sc._deserl_json.cend()) { - // TODO: check return value - deserl_fn_it->second(fh, v); - } else { - std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n"; - } - } - scanned_frags.push_back(fh); - } - - // TODO: mutex and move code to async and return this list ? - - // throw new frag event here, after loading them all - for (const FragmentID fid : scanned_frags) { - throwEventConstruct(fid); - } - - return scanned_frags.size(); -} - -void FragmentStore::scanStoragePathAsync(std::string path) { - // add path to queue - // HACK: if path is known/any fragment is in the path, this operation blocks (non async) - scanStoragePath(path); // TODO: make async and post result -} - -static bool serl_json_data_enc_type(const FragmentHandle fh, nlohmann::json& out) { - out = static_cast>( - fh.get().enc - ); - return true; -} - -static bool deserl_json_data_enc_type(FragmentHandle fh, const nlohmann::json& in) { - fh.emplace_or_replace( - static_cast( - static_cast>(in) - ) - ); - return true; -} - -static bool serl_json_data_comp_type(const FragmentHandle fh, nlohmann::json& out) { - out = static_cast>( - fh.get().comp - ); - return true; -} - -static bool deserl_json_data_comp_type(FragmentHandle fh, const nlohmann::json& in) { - fh.emplace_or_replace( - static_cast( - static_cast>(in) - ) - ); - return true; -} - -void FragmentStore::registerSerializers(void) { - _sc.registerSerializerJson(serl_json_data_enc_type); - _sc.registerDeSerializerJson(deserl_json_data_enc_type); - _sc.registerSerializerJson(serl_json_data_comp_type); - _sc.registerDeSerializerJson(deserl_json_data_comp_type); -} - diff --git a/src/fragment_store/fragment_store.hpp b/src/fragment_store/fragment_store.hpp deleted file mode 100644 index 5982f70..0000000 --- a/src/fragment_store/fragment_store.hpp +++ /dev/null @@ -1,98 +0,0 @@ -#pragma once - -#include - -#include "./fragment_store_i.hpp" - -#include "./types.hpp" -#include "./meta_components.hpp" - -#include "./serializer.hpp" - -#include "./uuid_generator.hpp" - -#include -#include - -#include - -#include -#include -#include -#include - -struct FragmentStore : public FragmentStoreI { - UUIDGenerator_128_128 _session_uuid_gen; - - std::string _default_store_path; - - uint64_t _memory_budget {10u*1024u*1024u}; - uint64_t _memory_usage {0u}; - - SerializerCallbacks _sc; - - FragmentStore(void); - FragmentStore(std::array session_uuid_namespace); - - // TODO: make the frags ref counted - - // TODO: check for exising - std::vector generateNewUID(void); - - // ========== new fragment ========== - - // memory backed owned - FragmentID newFragmentMemoryOwned( - const std::vector& id, - size_t initial_size - ); - - // memory backed view (can only be added? not new?) - - // file backed (rw...) - // needs to know which store path to put into - FragmentID newFragmentFile( - std::string_view store_path, - MetaFileType mft, - const std::vector& id - ); - // this variant generate a new, mostly unique, id for us - FragmentID newFragmentFile( - std::string_view store_path, - MetaFileType mft - ); - - // ========== add fragment ========== - - // ========== get fragment ========== - FragmentID getFragmentByID( - const std::vector& id - ); - FragmentID getFragmentCustomMatcher( - std::function& fn - ); - - // remove fragment? - // unload? - - // ========== sync fragment to storage ========== - using write_to_storage_fetch_data_cb = uint64_t(uint8_t* request_buffer, uint64_t buffer_size); - // calls data_cb with a buffer to be filled in, cb returns actual count of data. if returned < max, its the last buffer. - bool syncToStorage(FragmentID fid, std::function& data_cb); - bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size); - - // ========== load fragment data from storage ========== - using read_from_storage_put_data_cb = void(const ByteSpan buffer); - bool loadFromStorage(FragmentID fid, std::function& data_cb); - // convenience function - nlohmann::json loadFromStorageNJ(FragmentID fid); - - // fragment discovery? - // returns number of new fragments - size_t scanStoragePath(std::string_view path); - void scanStoragePathAsync(std::string path); - - private: - void registerSerializers(void); // internal comps -}; - diff --git a/src/fragment_store/fragment_store_i.cpp b/src/fragment_store/fragment_store_i.cpp deleted file mode 100644 index 3f757fd..0000000 --- a/src/fragment_store/fragment_store_i.cpp +++ /dev/null @@ -1,31 +0,0 @@ -#include "./fragment_store_i.hpp" - -#include - -FragmentRegistry& FragmentStoreI::registry(void) { - return _reg; -} - -FragmentHandle FragmentStoreI::fragmentHandle(const FragmentID fid) { - return {_reg, fid}; -} - -void FragmentStoreI::throwEventConstruct(const FragmentID fid) { - std::cout << "FSI debug: event construct " << entt::to_integral(fid) << "\n"; - dispatch( - FragmentStore_Event::fragment_construct, - Fragment::Events::FragmentConstruct{ - FragmentHandle{_reg, fid} - } - ); -} - -void FragmentStoreI::throwEventUpdate(const FragmentID fid) { - std::cout << "FSI debug: event updated " << entt::to_integral(fid) << "\n"; - dispatch( - FragmentStore_Event::fragment_updated, - Fragment::Events::FragmentUpdated{ - FragmentHandle{_reg, fid} - } - ); -} diff --git a/src/fragment_store/fragment_store_i.hpp b/src/fragment_store/fragment_store_i.hpp deleted file mode 100644 index 774a766..0000000 --- a/src/fragment_store/fragment_store_i.hpp +++ /dev/null @@ -1,63 +0,0 @@ -#pragma once - -#include - -#include -#include - -#include - -// internal id -enum class FragmentID : uint32_t {}; -using FragmentRegistry = entt::basic_registry; -using FragmentHandle = entt::basic_handle; - -namespace Fragment::Events { - - struct FragmentConstruct { - const FragmentHandle e; - }; - struct FragmentUpdated { - const FragmentHandle e; - }; - //struct MessageDestory { - //const Message3Handle e; - //}; - -} // Fragment::Events - -enum class FragmentStore_Event : uint32_t { - fragment_construct, - fragment_updated, - //message_destroy, - - MAX -}; - -struct FragmentStoreEventI { - using enumType = FragmentStore_Event; - - virtual ~FragmentStoreEventI(void) {} - - virtual bool onEvent(const Fragment::Events::FragmentConstruct&) { return false; } - virtual bool onEvent(const Fragment::Events::FragmentUpdated&) { return false; } - //virtual bool onEvent(const Fragment::Events::MessageDestory&) { return false; } -}; -using FragmentStoreEventProviderI = EventProviderI; - -struct FragmentStoreI : public FragmentStoreEventProviderI { - static constexpr const char* version {"1"}; - - FragmentRegistry _reg; - - virtual ~FragmentStoreI(void) {} - - FragmentRegistry& registry(void); - FragmentHandle fragmentHandle(const FragmentID fid); - - void throwEventConstruct(const FragmentID fid); - void throwEventUpdate(const FragmentID fid); - // TODO - //void throwEventDestroy(); -}; - diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index ce0b1eb..c44b511 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -1,5 +1,7 @@ #include "./message_fragment_store.hpp" +#include "./serializer_json.hpp" + #include "../json/message_components.hpp" #include @@ -28,8 +30,8 @@ namespace Message::Components { ////std::vector uid; //FragmentID id; //}; - // only contains fragments with <1024 messages and <28h tsrage (or whatever) - entt::dense_set fid_open; + // only contains fragments with <1024 messages and <2h tsrage (or whatever) + entt::dense_set fid_open; }; // all message fragments of this contact @@ -40,30 +42,30 @@ namespace Message::Components { size_t i_b; size_t i_e; }; - entt::dense_map frags; + entt::dense_map frags; // add 2 sorted contact lists for both range begin and end // TODO: adding and removing becomes expensive with enough frags, consider splitting or heap - std::vector sorted_begin; - std::vector sorted_end; + std::vector sorted_begin; + std::vector sorted_end; // api // return true if it was actually inserted - bool insert(FragmentHandle frag); - bool erase(FragmentID frag); + bool insert(ObjectHandle frag); + bool erase(Object frag); // update? (just erase() + insert()) // uses range begin to go back in time - FragmentID prev(FragmentID frag) const; + Object prev(Object frag) const; // uses range end to go forward in time - FragmentID next(FragmentID frag) const; + Object next(Object frag) const; }; // all LOADED message fragments // TODO: merge into ContactFragments (and pull in openfrags) struct LoadedContactFragments final { // kept up-to-date by events - entt::dense_set frags; + entt::dense_set frags; }; } // Message::Components @@ -84,6 +86,23 @@ namespace ObjectStore::Components { } } // ObjectStore::Component +static nlohmann::json loadFromStorageNJ(ObjectHandle oh) { + assert(oh.all_of()); + auto* backend = oh.get().ptr; + assert(backend != nullptr); + + std::vector tmp_buffer; + std::function cb = [&tmp_buffer](const ByteSpan buffer) { + tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend()); + }; + if (!backend->read(oh, cb)) { + std::cerr << "failed to read obj '" << bin2hex(oh.get().v) << "'\n"; + return false; + } + + return nlohmann::json::parse(tmp_buffer, nullptr, false); +} + void MessageFragmentStore::handleMessage(const Message3Handle& m) { if (_fs_ignore_event) { // message event because of us loading a fragment, ignore @@ -113,8 +132,8 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } // TODO: use fid, seving full fuid for every message consumes alot of memory (and heap frag) - if (!m.all_of()) { - std::cout << "MFS: new msg missing FID\n"; + if (!m.all_of()) { + std::cout << "MFS: new msg missing Object\n"; if (!m.registry()->ctx().contains()) { m.registry()->ctx().emplace(); } @@ -125,11 +144,11 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // missing fuid // find closesed non-sealed off fragment - FragmentID fragment_id{entt::null}; + Object fragment_id{entt::null}; // first search for fragment where the ts falls into the range for (const auto& fid : fid_open) { - auto fh = _fs.fragmentHandle(fid); + auto fh = _os.objectHandle(fid); assert(static_cast(fh)); // assuming ts range exists @@ -143,9 +162,9 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } // if it did not fit into an existing fragment, we next look for fragments that could be extended - if (!_fs._reg.valid(fragment_id)) { + if (!_os._reg.valid(fragment_id)) { for (const auto& fid : fid_open) { - auto fh = _fs.fragmentHandle(fid); + auto fh = _os.objectHandle(fid); assert(static_cast(fh)); // assuming ts range exists @@ -195,11 +214,13 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } // if its still not found, we need a new fragment - if (!_fs._reg.valid(fragment_id)) { - const auto new_fid = _fs.newFragmentFile("test_message_store/", MetaFileType::BINARY_MSGPACK); - auto fh = _fs.fragmentHandle(new_fid); + if (!_os.registry().valid(fragment_id)) { + const auto new_uuid = _session_uuid_gen(); + _fs_ignore_event = true; + auto fh = _sb.newObject(ByteSpan{new_uuid}); + _fs_ignore_event = false; if (!static_cast(fh)) { - std::cout << "MFS error: failed to create new fragment for message\n"; + std::cout << "MFS error: failed to create new object for message\n"; return; } @@ -238,17 +259,17 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { std::cout << "MFS: created new fragment " << bin2hex(fh.get().v) << "\n"; _fs_ignore_event = true; - _fs.throwEventConstruct(fh); + _os.throwEventConstruct(fh); _fs_ignore_event = false; } // if this is still empty, something is very wrong and we exit here - if (!_fs._reg.valid(fragment_id)) { + if (!_os.registry().valid(fragment_id)) { std::cout << "MFS error: failed to find/create fragment for message\n"; return; } - m.emplace_or_replace(fragment_id); + m.emplace_or_replace(fragment_id); // in this case we know the fragment needs an update for (const auto& it : _fuid_save_queue) { @@ -257,11 +278,11 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { return; // done } } - _fuid_save_queue.push_back({Message::getTimeMS(), fragment_id, m.registry()}); + _fuid_save_queue.push_back({Message::getTimeMS(), {_os.registry(), fragment_id}, m.registry()}); return; // done } - const auto msg_fh = _fs.fragmentHandle(m.get().fid); + const auto msg_fh = _os.objectHandle(m.get().o); if (!static_cast(msg_fh)) { std::cerr << "MFS error: fid in message is invalid\n"; return; // TODO: properly handle this case @@ -290,9 +311,9 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // assumes not loaded frag // need update from frag -void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh) { +void MessageFragmentStore::loadFragment(Message3Registry& reg, ObjectHandle fh) { std::cout << "MFS: loadFragment\n"; - const auto j = _fs.loadFromStorageNJ(fh); + const auto j = loadFromStorageNJ(fh); if (!j.is_array()) { // wrong data @@ -339,7 +360,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh } } - new_real_msg.emplace_or_replace(fh); + new_real_msg.emplace_or_replace(fh); // dup check (hacky, specific to protocols) Message3 dup_msg {entt::null}; @@ -393,7 +414,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh } } -bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry& reg) { +bool MessageFragmentStore::syncFragToStorage(ObjectHandle fh, Message3Registry& reg) { auto& ftsrange = fh.get_or_emplace(Message::getTimeMS(), Message::getTimeMS()); auto j = nlohmann::json::array(); @@ -404,7 +425,7 @@ bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) { const Message3 m = *it; - if (!reg.all_of(m)) { + if (!reg.all_of(m)) { continue; } @@ -413,7 +434,7 @@ bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry continue; } - if (_fuid_save_queue.front().id != reg.get(m).fid) { + if (_fuid_save_queue.front().id != reg.get(m).o) { continue; // not ours } @@ -452,10 +473,13 @@ bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry // if save as binary //nlohmann::json::to_msgpack(j); auto j_dump = j.dump(2, ' ', true); - if (_fs.syncToStorage(fh, reinterpret_cast(j_dump.data()), j_dump.size())) { + assert(fh.all_of()); + auto* backend = fh.get().ptr; + //if (_os.syncToStorage(fh, reinterpret_cast(j_dump.data()), j_dump.size())) { + if (backend->write(fh, {reinterpret_cast(j_dump.data()), j_dump.size()})) { // TODO: make this better, should this be called on fail? should this be called before sync? (prob not) _fs_ignore_event = true; - _fs.throwEventUpdate(fh); + _os.throwEventUpdate(fh); _fs_ignore_event = false; //std::cout << "MFS: dumped " << j_dump << "\n"; @@ -470,30 +494,32 @@ bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry MessageFragmentStore::MessageFragmentStore( Contact3Registry& cr, RegistryMessageModel& rmm, - FragmentStore& fs -) : _cr(cr), _rmm(rmm), _fs(fs), _sc{_cr, {}, {}} { + ObjectStore2& os, + StorageBackendI& sb +) : _cr(cr), _rmm(rmm), _os(os), _sb(sb), _sc{_cr, {}, {}} { _rmm.subscribe(this, RegistryMessageModel_Event::message_construct); _rmm.subscribe(this, RegistryMessageModel_Event::message_updated); _rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); - _fs._sc.registerSerializerJson(); - _fs._sc.registerDeSerializerJson(); - _fs._sc.registerSerializerJson(); - _fs._sc.registerDeSerializerJson(); + auto& sjc = _os.registry().ctx().get>(); + sjc.registerSerializer(); + sjc.registerDeSerializer(); + sjc.registerSerializer(); + sjc.registerDeSerializer(); // old - _fs._sc.registerSerializerJson(_fs._sc.component_get_json); - _fs._sc.registerDeSerializerJson(_fs._sc.component_emplace_or_replace_json); - _fs._sc.registerSerializerJson(_fs._sc.component_get_json); - _fs._sc.registerDeSerializerJson(_fs._sc.component_emplace_or_replace_json); + sjc.registerSerializer(sjc.component_get_json); + sjc.registerDeSerializer(sjc.component_emplace_or_replace_json); + sjc.registerSerializer(sjc.component_get_json); + sjc.registerDeSerializer(sjc.component_emplace_or_replace_json); - _fs.subscribe(this, FragmentStore_Event::fragment_construct); - _fs.subscribe(this, FragmentStore_Event::fragment_updated); + _os.subscribe(this, ObjectStore_Event::object_construct); + _os.subscribe(this, ObjectStore_Event::object_update); } MessageFragmentStore::~MessageFragmentStore(void) { while (!_fuid_save_queue.empty()) { - auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id); + auto fh = _fuid_save_queue.front().id; auto* reg = _fuid_save_queue.front().reg; assert(reg != nullptr); syncFragToStorage(fh, *reg); @@ -570,7 +596,7 @@ float MessageFragmentStore::tick(float) { if (!_fuid_save_queue.empty()) { // wait 10sec before saving if (_fuid_save_queue.front().ts_since_dirty + 10*1000 <= ts_now) { - auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id); + auto fh = _fuid_save_queue.front().id; auto* reg = _fuid_save_queue.front().reg; assert(reg != nullptr); if (syncFragToStorage(fh, *reg)) { @@ -587,7 +613,7 @@ float MessageFragmentStore::tick(float) { const bool had_events = !_event_check_queue.empty(); for (size_t i = 0; i < 10 && !_event_check_queue.empty(); i++) { std::cout << "MFS: event check\n"; - auto fh = _fs.fragmentHandle(_event_check_queue.front().fid); + auto fh = _event_check_queue.front().fid; auto c = _event_check_queue.front().c; _event_check_queue.pop_front(); @@ -643,7 +669,7 @@ float MessageFragmentStore::tick(float) { continue; } - auto fh = _fs.fragmentHandle(fid); + auto fh = _os.objectHandle(fid); if (!static_cast(fh)) { std::cerr << "MFS error: frag is invalid\n"; @@ -696,25 +722,25 @@ float MessageFragmentStore::tick(float) { cf.sorted_end.crbegin(), cf.sorted_end.crend(), ts_begin_comp.ts, - [&](const FragmentID element, const auto& value) -> bool { - return _fs._reg.get(element).end >= value; + [&](const Object element, const auto& value) -> bool { + return _os.registry().get(element).end >= value; } ); - FragmentID next_frag{entt::null}; + Object next_frag{entt::null}; if (right != cf.sorted_end.crend()) { next_frag = cf.next(*right); } // we checked earlier that cf is not empty - if (!_fs._reg.valid(next_frag)) { + if (!_os.registry().valid(next_frag)) { // fall back to closest, cf is not empty next_frag = cf.sorted_end.front(); } // a single adjacent frag is often not enough // only ok bc next is cheap - for (size_t i = 0; i < 5 && _fs._reg.valid(next_frag); next_frag = cf.next(next_frag)) { - auto fh = _fs.fragmentHandle(next_frag); + for (size_t i = 0; i < 5 && _os.registry().valid(next_frag); next_frag = cf.next(next_frag)) { + auto fh = _os.objectHandle(next_frag); if (fh.any_of()) { continue; // skip known empty } @@ -744,25 +770,25 @@ float MessageFragmentStore::tick(float) { cf.sorted_begin.cbegin(), cf.sorted_begin.cend(), ts_end, - [&](const FragmentID element, const auto& value) -> bool { - return _fs._reg.get(element).begin < value; + [&](const Object element, const auto& value) -> bool { + return _os.registry().get(element).begin < value; } ); - FragmentID prev_frag{entt::null}; + Object prev_frag{entt::null}; if (left != cf.sorted_begin.cend()) { prev_frag = cf.prev(*left); } // we checked earlier that cf is not empty - if (!_fs._reg.valid(prev_frag)) { + if (!_os.registry().valid(prev_frag)) { // fall back to closest, cf is not empty prev_frag = cf.sorted_begin.back(); } // a single adjacent frag is often not enough // only ok bc next is cheap - for (size_t i = 0; i < 5 && _fs._reg.valid(prev_frag); prev_frag = cf.prev(prev_frag)) { - auto fh = _fs.fragmentHandle(prev_frag); + for (size_t i = 0; i < 5 && _os.registry().valid(prev_frag); prev_frag = cf.prev(prev_frag)) { + auto fh = _os.objectHandle(prev_frag); if (fh.any_of()) { continue; // skip known empty } @@ -791,10 +817,6 @@ float MessageFragmentStore::tick(float) { return 1000.f*60.f*60.f; } -void MessageFragmentStore::triggerScan(void) { - _fs.scanStoragePath("test_message_store/"); -} - bool MessageFragmentStore::onEvent(const Message::Events::MessageConstruct& e) { handleMessage(e.e); return false; @@ -807,7 +829,7 @@ bool MessageFragmentStore::onEvent(const Message::Events::MessageUpdated& e) { // TODO: handle deletes? diff between unload? -bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) { +bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectConstruct& e) { if (_fs_ignore_event) { return false; // skip self } @@ -854,7 +876,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) return false; } -bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) { +bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectUpdate& e) { if (_fs_ignore_event) { return false; // skip self } @@ -911,7 +933,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) { return false; } -bool Message::Components::ContactFragments::insert(FragmentHandle frag) { +bool Message::Components::ContactFragments::insert(ObjectHandle frag) { if (frags.contains(frag)) { return false; } @@ -926,7 +948,7 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) { const auto pos = std::find_if( sorted_begin.cbegin(), sorted_begin.cend(), - [frag](const FragmentID a) -> bool { + [frag](const Object a) -> bool { const auto begin_a = frag.registry()->get(a).begin; const auto begin_frag = frag.get().begin; if (begin_a > begin_frag) { @@ -951,7 +973,7 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) { const auto pos = std::find_if_not( sorted_end.cbegin(), sorted_end.cend(), - [frag](const FragmentID a) -> bool { + [frag](const Object a) -> bool { const auto end_a = frag.registry()->get(a).end; const auto end_frag = frag.get().end; if (end_a > end_frag) { @@ -984,7 +1006,7 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) { return true; } -bool Message::Components::ContactFragments::erase(FragmentID frag) { +bool Message::Components::ContactFragments::erase(Object frag) { auto frags_it = frags.find(frag); if (frags_it == frags.end()) { return false; @@ -1001,7 +1023,7 @@ bool Message::Components::ContactFragments::erase(FragmentID frag) { return true; } -FragmentID Message::Components::ContactFragments::prev(FragmentID frag) const { +Object Message::Components::ContactFragments::prev(Object frag) const { // uses range begin to go back in time auto it = frags.find(frag); @@ -1017,7 +1039,7 @@ FragmentID Message::Components::ContactFragments::prev(FragmentID frag) const { return entt::null; } -FragmentID Message::Components::ContactFragments::next(FragmentID frag) const { +Object Message::Components::ContactFragments::next(Object frag) const { // uses range end to go forward in time auto it = frags.find(frag); diff --git a/src/fragment_store/message_fragment_store.hpp b/src/fragment_store/message_fragment_store.hpp index 179ece1..91f355d 100644 --- a/src/fragment_store/message_fragment_store.hpp +++ b/src/fragment_store/message_fragment_store.hpp @@ -1,12 +1,12 @@ #pragma once #include "./meta_components.hpp" -#include "./fragment_store_i.hpp" -#include "./fragment_store.hpp" +#include "./object_store.hpp" + +#include "./uuid_generator.hpp" #include "./message_serializer.hpp" -#include #include #include @@ -22,8 +22,8 @@ namespace Message::Components { // unused, consumes too much memory (highly compressable) //using FUID = FragComp::ID; - struct FID { - FragmentID fid {entt::null}; + struct Obj { + Object o {entt::null}; }; // points to the front/newer message @@ -79,32 +79,35 @@ namespace Fragment::Components { // on new message: assign fuid // on new and update: mark as fragment dirty // on delete: mark as fragment dirty? -class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentStoreEventI { +class MessageFragmentStore : public RegistryMessageModelEventI, public ObjectStoreEventI { protected: Contact3Registry& _cr; RegistryMessageModel& _rmm; - FragmentStore& _fs; + ObjectStore2& _os; + StorageBackendI& _sb; bool _fs_ignore_event {false}; + UUIDGenerator_128_128 _session_uuid_gen; + // for message components only MessageSerializerCallbacks _sc; void handleMessage(const Message3Handle& m); - void loadFragment(Message3Registry& reg, FragmentHandle fh); + void loadFragment(Message3Registry& reg, ObjectHandle oh); - bool syncFragToStorage(FragmentHandle fh, Message3Registry& reg); + bool syncFragToStorage(ObjectHandle oh, Message3Registry& reg); struct SaveQueueEntry final { uint64_t ts_since_dirty{0}; //std::vector id; - FragmentID id; + ObjectHandle id; Message3Registry* reg{nullptr}; }; std::deque _fuid_save_queue; struct ECQueueEntry final { - FragmentID fid; + ObjectHandle fid; Contact3 c; }; std::deque _event_check_queue; @@ -118,7 +121,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS MessageFragmentStore( Contact3Registry& cr, RegistryMessageModel& rmm, - FragmentStore& fs + ObjectStore2& os, + StorageBackendI& sb ); virtual ~MessageFragmentStore(void); @@ -126,14 +130,12 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS float tick(float time_delta); - void triggerScan(void); - protected: // rmm bool onEvent(const Message::Events::MessageConstruct& e) override; bool onEvent(const Message::Events::MessageUpdated& e) override; protected: // fs - bool onEvent(const Fragment::Events::FragmentConstruct& e) override; - bool onEvent(const Fragment::Events::FragmentUpdated& e) override; + bool onEvent(const ObjectStore::Events::ObjectConstruct& e) override; + bool onEvent(const ObjectStore::Events::ObjectUpdate& e) override; }; diff --git a/src/fragment_store/serializer.hpp b/src/fragment_store/serializer.hpp deleted file mode 100644 index a1daa98..0000000 --- a/src/fragment_store/serializer.hpp +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once - -#include -#include -#include - -#include - -template -struct SerializerCallbacks { - using Registry = entt::basic_registry; - using Handle = entt::basic_handle; - - // nlohmann - // json/msgpack - using serialize_json_fn = bool(*)(const Handle h, nlohmann::json& out); - entt::dense_map _serl_json; - - using deserialize_json_fn = bool(*)(Handle h, const nlohmann::json& in); - entt::dense_map _deserl_json; - - template - static bool component_get_json(const Handle h, nlohmann::json& j) { - if (h.template all_of()) { - if constexpr (!std::is_empty_v) { - j = h.template get(); - } - return true; - } - - return false; - } - - template - static bool component_emplace_or_replace_json(Handle h, const nlohmann::json& j) { - if constexpr (std::is_empty_v) { - h.template emplace_or_replace(); // assert empty json? - } else { - h.template emplace_or_replace(static_cast(j)); - } - return true; - } - - void registerSerializerJson(serialize_json_fn fn, const entt::type_info& type_info) { - _serl_json[type_info.hash()] = fn; - } - - template - void registerSerializerJson( - serialize_json_fn fn = component_get_json, - const entt::type_info& type_info = entt::type_id() - ) { - registerSerializerJson(fn, type_info); - } - - void registerDeSerializerJson(deserialize_json_fn fn, const entt::type_info& type_info) { - _deserl_json[type_info.hash()] = fn; - } - - template - void registerDeSerializerJson( - deserialize_json_fn fn = component_emplace_or_replace_json, - const entt::type_info& type_info = entt::type_id() - ) { - registerDeSerializerJson(fn, type_info); - } -}; - diff --git a/src/fragment_store/test_fragstore.cpp b/src/fragment_store/test_fragstore.cpp deleted file mode 100644 index 1319e29..0000000 --- a/src/fragment_store/test_fragstore.cpp +++ /dev/null @@ -1,80 +0,0 @@ -#include -#include - -#include "./fragment_store.hpp" - -#include -#include - -namespace Components { - struct MessagesTimestampRange { - uint64_t begin {0}; - uint64_t end {1000}; - }; - - NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesTimestampRange, begin, end) -} // Components - - -int main(void) { - FragmentStore fs; - fs._default_store_path = "test_store/"; - fs._sc.registerSerializerJson(); - fs._sc.registerDeSerializerJson(); - - const auto frag0 = fs.newFragmentFile("", MetaFileType::TEXT_JSON, {0xff, 0xf1, 0xf2, 0xf0, 0xff, 0xff, 0xff, 0xf9}); - - const auto frag1 = fs.newFragmentFile("", MetaFileType::BINARY_MSGPACK); - - const auto frag2 = fs.newFragmentFile("", MetaFileType::BINARY_MSGPACK); - - { - auto frag0h = fs.fragmentHandle(frag0); - - frag0h.emplace_or_replace(); - frag0h.emplace_or_replace(); - frag0h.emplace_or_replace(); - - std::function fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t { - uint64_t i = 0; - for (; i+read < 3000 && i < buffer_size; i++) { - request_buffer[i] = uint8_t((i+read) & 0xff); - } - read += i; - - return i; - }; - fs.syncToStorage(frag0, fn_cb); - } - - { - auto frag1h = fs.fragmentHandle(frag1); - - frag1h.emplace_or_replace().comp = Compression::ZSTD; - frag1h.emplace_or_replace(); - - std::function fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t { - static constexpr std::string_view text = "This is some random data"; - uint64_t i = 0; - for (; i+read < text.size() && i < buffer_size; i++) { - request_buffer[i] = text[i+read]; - } - read += i; - - return i; - }; - fs.syncToStorage(frag1, fn_cb); - } - - { - auto frag2h = fs.fragmentHandle(frag2); - - frag2h.emplace_or_replace(); - frag2h.emplace_or_replace(); - - static constexpr std::string_view text = "This is more random data"; - fs.syncToStorage(frag2, reinterpret_cast(text.data()), text.size()); - } - return 0; -} - diff --git a/src/main_screen.cpp b/src/main_screen.cpp index 6ed720d..c588f35 100644 --- a/src/main_screen.cpp +++ b/src/main_screen.cpp @@ -16,7 +16,8 @@ MainScreen::MainScreen(SDL_Renderer* renderer_, std::string save_path, std::stri renderer(renderer_), rmm(cr), mts(rmm), - mfs(cr, rmm, fs), + mfsb(os, "test2_message_store/"), + mfs(cr, rmm, os, mfsb), tc(save_path, save_password), tpi(tc.getTox()), ad(tc), @@ -55,9 +56,7 @@ MainScreen::MainScreen(SDL_Renderer* renderer_, std::string save_path, std::stri std::cout << "own address: " << tc.toxSelfGetAddressStr() << "\n"; { // setup plugin instances - // TODO: make interface useful - g_provideInstance("FragmentStoreI", "host", &fs); - g_provideInstance("FragmentStore", "host", &fs); + g_provideInstance("ObjectStore2", "host", &os); g_provideInstance("ConfigModelI", "host", &conf); g_provideInstance("Contact3Registry", "1", "host", &cr); @@ -85,7 +84,7 @@ MainScreen::MainScreen(SDL_Renderer* renderer_, std::string save_path, std::stri conf.dump(); - mfs.triggerScan(); // HACK: after plugins and tox contacts got loaded + mfsb.scanAsync(); // HACK: after plugins and tox contacts got loaded } MainScreen::~MainScreen(void) { diff --git a/src/main_screen.hpp b/src/main_screen.hpp index 39baf7e..abe77f3 100644 --- a/src/main_screen.hpp +++ b/src/main_screen.hpp @@ -2,7 +2,8 @@ #include "./screen.hpp" -#include "./fragment_store/fragment_store.hpp" +#include "./fragment_store/object_store.hpp" +#include "./fragment_store/backends/filesystem_storage.hpp" #include #include #include @@ -45,12 +46,13 @@ extern "C" { struct MainScreen final : public Screen { SDL_Renderer* renderer; - FragmentStore fs; + ObjectStore2 os; SimpleConfigModel conf; Contact3Registry cr; RegistryMessageModel rmm; MessageTimeSort mts; + backend::FilesystemStorage mfsb; // message fsb // TODO: make configurable MessageFragmentStore mfs; ToxEventLogger tel{std::cout};