From 854ed851b40e49982c5afa5f3958afd88fe263cb Mon Sep 17 00:00:00 2001 From: Green Sky Date: Wed, 10 Apr 2024 16:26:24 +0200 Subject: [PATCH] fixes missing comps and conversion tool is almost working (most of meta is transfered) --- .../backends/filesystem_storage.cpp | 22 +++++- .../backends/filesystem_storage.hpp | 2 +- src/fragment_store/convert_frag_to_obj.cpp | 76 +++++++++++++++++++ src/fragment_store/object_store.hpp | 2 +- 4 files changed, 98 insertions(+), 4 deletions(-) diff --git a/src/fragment_store/backends/filesystem_storage.cpp b/src/fragment_store/backends/filesystem_storage.cpp index 45bfe00..222a971 100644 --- a/src/fragment_store/backends/filesystem_storage.cpp +++ b/src/fragment_store/backends/filesystem_storage.cpp @@ -40,6 +40,10 @@ static ByteSpan spanFromRead(const std::variant>& // TODO: move somewhere else static bool serl_json_data_enc_type(const ObjectHandle oh, nlohmann::json& out) { + if (!oh.all_of()) { + return false; + } + out = static_cast>( oh.get().enc ); @@ -56,6 +60,10 @@ static bool deserl_json_data_enc_type(ObjectHandle oh, const nlohmann::json& in) } static bool serl_json_data_comp_type(const ObjectHandle oh, nlohmann::json& out) { + if (!oh.all_of()) { + return false; + } + out = static_cast>( oh.get().comp ); @@ -524,6 +532,9 @@ size_t FilesystemStorage::scanPath(std::string_view path) { // main thread // TODO: check timestamps of preexisting and reload? mark external/remote dirty? for (const auto& it : file_obj_list) { + MetaFileType mft {MetaFileType::TEXT_JSON}; + Encryption meta_enc {Encryption::NONE}; + Compression meta_comp {Compression::NONE}; nlohmann::json j; if (it.meta_ext == ".meta.msgpack") { std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); @@ -532,6 +543,8 @@ size_t FilesystemStorage::scanPath(std::string_view path) { continue; } + mft = MetaFileType::BINARY_MSGPACK; + // file is a msgpack within a msgpack std::vector full_meta_data; @@ -558,13 +571,13 @@ size_t FilesystemStorage::scanPath(std::string_view path) { continue; } - Encryption meta_enc = meta_header_j.at(1); + meta_enc = meta_header_j.at(1); if (meta_enc != Encryption::NONE) { std::cerr << "FS error: unknown encryption " << it.obj_path << "\n"; continue; } - Compression meta_comp = meta_header_j.at(2); + meta_comp = meta_header_j.at(2); if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) { std::cerr << "FS error: unknown compression " << it.obj_path << "\n"; continue; @@ -598,6 +611,8 @@ size_t FilesystemStorage::scanPath(std::string_view path) { continue; } + mft = MetaFileType::TEXT_JSON; + file >> j; } else { assert(false); @@ -613,6 +628,9 @@ size_t FilesystemStorage::scanPath(std::string_view path) { ObjectHandle oh{_os.registry(), _os.registry().create()}; oh.emplace(this); oh.emplace(hex2bin(it.id_str)); + oh.emplace(mft); + oh.emplace(meta_enc); + oh.emplace(meta_comp); oh.emplace(it.obj_path.generic_u8string()); diff --git a/src/fragment_store/backends/filesystem_storage.hpp b/src/fragment_store/backends/filesystem_storage.hpp index 17c1c48..ef5c111 100644 --- a/src/fragment_store/backends/filesystem_storage.hpp +++ b/src/fragment_store/backends/filesystem_storage.hpp @@ -27,7 +27,7 @@ struct FilesystemStorage : public StorageBackendI { size_t scanPath(std::string_view path); void scanPathAsync(std::string path); - private: + public: // TODO: private? // this thing needs to change and be facilitated over the OS // but the json serializer are specific to the backend SerializerCallbacks _sc; diff --git a/src/fragment_store/convert_frag_to_obj.cpp b/src/fragment_store/convert_frag_to_obj.cpp index 772d7bf..5b4cd4a 100644 --- a/src/fragment_store/convert_frag_to_obj.cpp +++ b/src/fragment_store/convert_frag_to_obj.cpp @@ -1,9 +1,14 @@ #include "./object_store.hpp" #include "./backends/filesystem_storage.hpp" +#include "./meta_components.hpp" + +#include #include #include +#include + int main(int argc, const char** argv) { if (argc != 3) { std::cerr << "wrong paramter count, do " << argv[0] << " \n"; @@ -26,6 +31,77 @@ int main(int argc, const char** argv) { // add message fragment store too (adds meta?) // hookup events + struct EventListener : public ObjectStoreEventI { + ObjectStore2& _os_src; + backend::FilesystemStorage& _fsb_src; + + ObjectStore2& _os_dst; + backend::FilesystemStorage& _fsb_dst; + + EventListener( + ObjectStore2& os_src, + backend::FilesystemStorage& fsb_src, + ObjectStore2& os_dst, + backend::FilesystemStorage& fsb_dst + ) : + _os_src(os_src), + _fsb_src(fsb_src), + _os_dst(os_dst), + _fsb_dst(fsb_dst) + { + _os_src.subscribe(this, ObjectStore_Event::object_construct); + _os_src.subscribe(this, ObjectStore_Event::object_update); + } + + protected: // os + bool onEvent(const ObjectStore::Events::ObjectConstruct& e) override { + assert(e.e.all_of()); + assert(e.e.all_of()); + auto oh = _fsb_dst.newObject(e.e.get().type, ByteSpan{e.e.get().v}); + + if (!static_cast(oh)) { + // already exists + return false; + } + + { // sync meta + // some hardcoded ehpemeral (besides mft/id) + oh.emplace_or_replace(e.e.get_or_emplace()); + oh.emplace_or_replace(e.e.get_or_emplace()); + + // serializable + for (const auto& [type, fn] : _fsb_src._sc._serl_json) { + //if (!e.e.registry()->storage(type)->contains(e.e)) { + //continue; + //} + + // this is hacky but we serialize and then deserialize the component + // raw copy might be better in the future + nlohmann::json tmp_j; + if (fn(e.e, tmp_j)) { + _fsb_dst._sc._deserl_json.at(type)(oh, tmp_j); + } + } + } + + // read src and write dst data + static_cast(_fsb_dst).write(oh, ByteSpan{}); + + return false; + } + + bool onEvent(const ObjectStore::Events::ObjectUpdate&) override { + std::cerr << "Update called\n"; + assert(false); + return false; + } + } el { + os_src, + fsb_src, + os_dst, + fsb_dst, + }; + // perform scan (which triggers events) fsb_dst.scanAsync(); // fill with existing? fsb_src.scanAsync(); // the scan diff --git a/src/fragment_store/object_store.hpp b/src/fragment_store/object_store.hpp index 6b48ade..fcc4456 100644 --- a/src/fragment_store/object_store.hpp +++ b/src/fragment_store/object_store.hpp @@ -29,7 +29,7 @@ struct StorageBackendI { // calls data_cb with a buffer to be filled in, cb returns actual count of data. if returned < max, its the last buffer. virtual bool write(Object o, std::function& data_cb) = 0; //virtual bool write(Object o, const uint8_t* data, const uint64_t data_size); // default impl - virtual bool write(Object o, const ByteSpan data); // default impl + bool write(Object o, const ByteSpan data); // ========== read object from storage ========== using read_from_storage_put_data_cb = void(const ByteSpan buffer);