From 4d3ffb8192623740f6e170855ee1cffd428b78da Mon Sep 17 00:00:00 2001 From: Green Sky Date: Fri, 12 Apr 2024 13:11:31 +0200 Subject: [PATCH] add filesystem backend --- CMakeLists.txt | 1 + src/CMakeLists.txt | 16 + .../object_store/backends/file2_stack.cpp | 93 +++ .../object_store/backends/file2_stack.hpp | 23 + .../backends/filesystem_storage.cpp | 627 ++++++++++++++++++ .../backends/filesystem_storage.hpp | 41 ++ 6 files changed, 801 insertions(+) create mode 100644 src/solanaceae/object_store/backends/file2_stack.cpp create mode 100644 src/solanaceae/object_store/backends/file2_stack.hpp create mode 100644 src/solanaceae/object_store/backends/filesystem_storage.cpp create mode 100644 src/solanaceae/object_store/backends/filesystem_storage.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 94f24a0..9ed8369 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,6 +12,7 @@ message("II SOLANACEAE_OBJECT_STORE_STANDALONE " ${SOLANACEAE_OBJECT_STORE_STAND #option(SOLANACEAE_OBJECT_STORE_BUILD_PLUGINS "Build the solanaceae_object_store plugins" ${SOLANACEAE_OBJECT_STORE_STANDALONE}) option(SOLANACEAE_OBJECT_STORE_BUILD_TESTING "Build the solanaceae_object_store tests" ${SOLANACEAE_OBJECT_STORE_STANDALONE}) +message("II SOLANACEAE_OBJECT_BUILD_TESTING " ${SOLANACEAE_OBJECT_STORE_BUILD_TESTING}) if (SOLANACEAE_OBJECT_STORE_STANDALONE) set(CMAKE_POSITION_INDEPENDENT_CODE ON) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2eb079b..9a0c9c0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -33,3 +33,19 @@ target_link_libraries(solanaceae_file2_zstd PUBLIC zstd::zstd ) +######################################## + +add_library(solanaceae_object_store_backend_filesystem + ./solanaceae/object_store/backends/file2_stack.hpp + ./solanaceae/object_store/backends/file2_stack.cpp + ./solanaceae/object_store/backends/filesystem_storage.hpp + ./solanaceae/object_store/backends/filesystem_storage.cpp +) + +target_include_directories(solanaceae_object_store_backend_filesystem PUBLIC .) +target_compile_features(solanaceae_object_store_backend_filesystem PUBLIC cxx_std_17) +target_link_libraries(solanaceae_object_store_backend_filesystem PUBLIC + solanaceae_object_store + solanaceae_file2_zstd +) + diff --git a/src/solanaceae/object_store/backends/file2_stack.cpp b/src/solanaceae/object_store/backends/file2_stack.cpp new file mode 100644 index 0000000..bf35fcc --- /dev/null +++ b/src/solanaceae/object_store/backends/file2_stack.cpp @@ -0,0 +1,93 @@ +#include "./file2_stack.hpp" + +#include +#include + +#include + +#include + +// add enc and comp file layers +// assumes a file is already in the stack +bool buildStackRead(std::stack>& file_stack, Encryption encryption, Compression compression) { + assert(!file_stack.empty()); + + // TODO: decrypt here + assert(encryption == Encryption::NONE); + + // add layer based on enum + if (compression == Compression::ZSTD) { + file_stack.push(std::make_unique(*file_stack.top().get())); + } else { + // TODO: make error instead + assert(compression == Compression::NONE); + } + + if (!file_stack.top()->isGood()) { + std::cerr << "FS error: file failed to add " << (int)compression << " decompression layer\n"; + return false; + } + + return true; +} + +// do i need this? +std::stack> buildFileStackRead(std::string_view file_path, Encryption encryption, Compression compression) { + std::stack> file_stack; + file_stack.push(std::make_unique(file_path)); + + if (!file_stack.top()->isGood()) { + std::cerr << "FS error: opening file for reading '" << file_path << "'\n"; + return {}; + } + + if (!buildStackRead(file_stack, encryption, compression)) { + std::cerr << "FS error: file failed to add layers for '" << file_path << "'\n"; + return {}; + } + + return file_stack; +} + +// add enc and comp file layers +// assumes a file is already in the stack +bool buildStackWrite(std::stack>& file_stack, Encryption encryption, Compression compression) { + assert(!file_stack.empty()); + + // TODO: encrypt here + assert(encryption == Encryption::NONE); + + // add layer based on enum + if (compression == Compression::ZSTD) { + file_stack.push(std::make_unique(*file_stack.top().get())); + } else { + // TODO: make error instead + assert(compression == Compression::NONE); + } + + if (!file_stack.top()->isGood()) { + std::cerr << "FS error: file failed to add " << (int)compression << " compression layer\n"; + return false; + } + + return true; +} + +// do i need this? +std::stack> buildFileStackWrite(std::string_view file_path, Encryption encryption, Compression compression) { + std::stack> file_stack; + file_stack.push(std::make_unique(file_path)); + + if (!file_stack.top()->isGood()) { + std::cerr << "FS error: opening file for writing '" << file_path << "'\n"; + return {}; + } + + if (!buildStackWrite(file_stack, encryption, compression)) { + std::cerr << "FS error: file failed to add layers for '" << file_path << "'\n"; + return {}; + } + + return file_stack; +} + diff --git a/src/solanaceae/object_store/backends/file2_stack.hpp b/src/solanaceae/object_store/backends/file2_stack.hpp new file mode 100644 index 0000000..a94e1a6 --- /dev/null +++ b/src/solanaceae/object_store/backends/file2_stack.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include "../types.hpp" + +#include + +#include +#include +#include + +// add enc and comp file layers +// assumes a file is already in the stack +[[nodiscard]] bool buildStackRead(std::stack>& file_stack, Encryption encryption, Compression compression); + +// do i need this? +[[nodiscard]] std::stack> buildFileStackRead(std::string_view file_path, Encryption encryption, Compression compression); + +// add enc and comp file layers +// assumes a file is already in the stack +[[nodiscard]] bool buildStackWrite(std::stack>& file_stack, Encryption encryption, Compression compression); + +// do i need this? +[[nodiscard]] std::stack> buildFileStackWrite(std::string_view file_path, Encryption encryption, Compression compression); diff --git a/src/solanaceae/object_store/backends/filesystem_storage.cpp b/src/solanaceae/object_store/backends/filesystem_storage.cpp new file mode 100644 index 0000000..bbc00fb --- /dev/null +++ b/src/solanaceae/object_store/backends/filesystem_storage.cpp @@ -0,0 +1,627 @@ +#include "./filesystem_storage.hpp" + +#include +#include + +#include + +#include +#include + +#include +#include + +#include "./file2_stack.hpp" + +#include + +#include + +static const char* metaFileTypeSuffix(MetaFileType mft) { + switch (mft) { + case MetaFileType::TEXT_JSON: return ".json"; + //case MetaFileType::BINARY_ARB: return ".bin"; + case MetaFileType::BINARY_MSGPACK: return ".msgpack"; + } + return ""; // .unk? +} + +// TODO: move to ... somewhere. (span? file2i?) +static ByteSpan spanFromRead(const std::variant>& data_var) { + if (std::holds_alternative>(data_var)) { + auto& vec = std::get>(data_var); + return {vec.data(), vec.size()}; + } else if (std::holds_alternative(data_var)) { + return std::get(data_var); + } else { + assert(false); + return {}; + } +} + + +namespace backend { + +FilesystemStorage::FilesystemStorage( + ObjectStore2& os, + std::string_view storage_path, + MetaFileType mft_new +) : StorageBackendI::StorageBackendI(os), _storage_path(storage_path), _mft_new(mft_new) { +} + +FilesystemStorage::~FilesystemStorage(void) { +} + +ObjectHandle FilesystemStorage::newObject(ByteSpan id) { + { // first check if id is already used (TODO: solve the multi obj/backend problem) + auto exising_oh = _os.getOneObjectByID(id); + if (static_cast(exising_oh)) { + return {}; + } + } + + std::filesystem::create_directories(_storage_path); + + if (!std::filesystem::is_directory(_storage_path)) { + std::cerr << "FS error: failed to create storage path dir '" << _storage_path << "'\n"; + return {}; + } + + const auto id_hex = bin2hex(id); + std::filesystem::path object_file_path; + + // TODO: refactor this magic somehow + if (id_hex.size() < 6) { + object_file_path = std::filesystem::path{_storage_path}/id_hex; + } else { + // use the first 2hex (1byte) as a subfolder + std::filesystem::create_directories(std::string{_storage_path} + id_hex.substr(0, 2)); + object_file_path = std::filesystem::path{std::string{_storage_path} + id_hex.substr(0, 2)} / id_hex.substr(2); + } + + if (std::filesystem::exists(object_file_path)) { + std::cerr << "FS error: object already exists in path '" << id_hex << "'\n"; + return {}; + } + + ObjectHandle oh{_os.registry(), _os.registry().create()}; + + oh.emplace(this); + oh.emplace(std::vector{id}); + oh.emplace(object_file_path.generic_u8string()); + oh.emplace(_mft_new); + + // meta needs to be synced to file + std::function empty_data_cb = [](auto*, auto) -> uint64_t { return 0; }; + if (!write(oh, empty_data_cb)) { + std::cerr << "FS error: write failed while creating new object file\n"; + oh.destroy(); + return {}; + } + + // while new metadata might be created here, making sure the file could be created is more important + _os.throwEventConstruct(oh); + + return oh; +} + +bool FilesystemStorage::write(Object o, std::function& data_cb) { + auto& reg = _os.registry(); + + if (!reg.valid(o)) { + return false; + } + + ObjectHandle oh {reg, o}; + + if (!oh.all_of()) { + // not a file fragment? + return false; + } + + // split object storage (meta and data are 2 files) + + MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults? + if (reg.all_of(o)) { + meta_type = oh.get().type; + } + + Encryption meta_enc = Encryption::NONE; // TODO: better defaults? + Compression meta_comp = Compression::NONE; // TODO: better defaults? + + if (meta_type != MetaFileType::TEXT_JSON) { + if (oh.all_of()) { + meta_enc = oh.get().enc; + } + + if (oh.all_of()) { + meta_comp = oh.get().comp; + } + } else { + // we cant have encryption or compression + // so we force NONE for TEXT JSON + + oh.emplace_or_replace(Encryption::NONE); + oh.emplace_or_replace(Compression::NONE); + } + + std::filesystem::path meta_tmp_path = oh.get().path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp"; + meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string()); + // TODO: make meta comp work with mem compressor + //auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp); + std::stack> meta_file_stack; + meta_file_stack.push(std::make_unique(std::string_view{meta_tmp_path.generic_u8string()})); + + if (meta_file_stack.empty()) { + std::cerr << "FS error: failed to create temporary meta file stack\n"; + std::filesystem::remove(meta_tmp_path); // might have created an empty file + return false; + } + + Encryption data_enc = Encryption::NONE; // TODO: better defaults + Compression data_comp = Compression::NONE; // TODO: better defaults + if (oh.all_of()) { + data_enc = oh.get().enc; + } + if (oh.all_of()) { + data_comp = oh.get().comp; + } + + std::filesystem::path data_tmp_path = oh.get().path + ".tmp"; + data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string()); + auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp); + if (data_file_stack.empty()) { + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } + std::filesystem::remove(meta_tmp_path); + std::cerr << "FS error: failed to create temporary data file stack\n"; + return false; + } + + try { // TODO: properly sanitize strings, so this cant throw + // sharing code between binary msgpack and text json for now + nlohmann::json meta_data_j = nlohmann::json::object(); // metadata needs to be an object, null not allowed + // metadata file + + auto& sjc = _os.registry().ctx().get>(); + + // TODO: refactor extract to OS + for (const auto& [type_id, storage] : reg.storage()) { + if (!storage.contains(o)) { + continue; + } + + //std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n"; + + // use type_id to find serializer + auto s_cb_it = sjc._serl.find(type_id); + if (s_cb_it == sjc._serl.end()) { + // could not find serializer, not saving + continue; + } + + // noooo, why cant numbers be keys + //if (meta_type == MetaFileType::BINARY_MSGPACK) { // msgpack uses the hash id instead + //s_cb_it->second(storage.value(fid), meta_data[storage.type().hash()]); + //} else if (meta_type == MetaFileType::TEXT_JSON) { + s_cb_it->second(oh, meta_data_j[storage.type().name()]); + //} + } + + if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file + std::vector binary_meta_data; + { + std::stack> binary_writer_stack; + binary_writer_stack.push(std::make_unique(binary_meta_data)); + + if (!buildStackWrite(binary_writer_stack, meta_enc, meta_comp)) { + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } + std::filesystem::remove(meta_tmp_path); + while (!data_file_stack.empty()) { data_file_stack.pop(); } + std::filesystem::remove(data_tmp_path); + std::cerr << "FS error: binary writer creation failed '" << oh.get().path << "'\n"; + return false; + } + + { + const std::vector meta_data = nlohmann::json::to_msgpack(meta_data_j); + if (!binary_writer_stack.top()->write(ByteSpan{meta_data})) { + // i feel like exceptions or refactoring would be nice here + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } + std::filesystem::remove(meta_tmp_path); + while (!data_file_stack.empty()) { data_file_stack.pop(); } + std::filesystem::remove(data_tmp_path); + std::cerr << "FS error: binary writer failed '" << oh.get().path << "'\n"; + return false; + } + } + } + + //// the meta file is itself msgpack data + nlohmann::json meta_header_j = nlohmann::json::array(); + meta_header_j.emplace_back() = "SOLMET"; + meta_header_j.push_back(meta_enc); + meta_header_j.push_back(meta_comp); + + // with a custom msgpack impl like cmp, we can be smarter here and dont need an extra buffer + meta_header_j.push_back(nlohmann::json::binary(binary_meta_data)); + + const auto meta_header_data = nlohmann::json::to_msgpack(meta_header_j); + meta_file_stack.top()->write({meta_header_data.data(), meta_header_data.size()}); + } else if (meta_type == MetaFileType::TEXT_JSON) { + // cant be compressed or encrypted + const auto meta_file_json_str = meta_data_j.dump(2, ' ', true); + meta_file_stack.top()->write({reinterpret_cast(meta_file_json_str.data()), meta_file_json_str.size()}); + } + + } catch (...) { + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? + std::filesystem::remove(meta_tmp_path); + while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? + std::filesystem::remove(data_tmp_path); + std::cerr << "FS error: failed to serialize json data\n"; + return false; + } + + // now data + // for zstd compression, chunk size is frame size. (no cross frame referencing) + // TODO: add buffering steam layer + static constexpr int64_t chunk_size{1024*1024}; // 1MiB should be enough + std::vector buffer(chunk_size); + uint64_t buffer_actual_size {0}; + do { + buffer_actual_size = data_cb(buffer.data(), buffer.size()); + if (buffer_actual_size == 0) { + break; + } + if (buffer_actual_size > buffer.size()) { + // wtf + break; + } + + data_file_stack.top()->write({buffer.data(), buffer_actual_size}); + } while (buffer_actual_size == buffer.size()); + + // flush // TODO: use scope + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? + while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? + + std::filesystem::rename( + meta_tmp_path, + oh.get().path + ".meta" + metaFileTypeSuffix(meta_type) + ); + + std::filesystem::rename( + data_tmp_path, + oh.get().path + ); + + // TODO: check return value of renames + + if (oh.all_of()) { + oh.remove(); + } + + return true; +} + +bool FilesystemStorage::read(Object o, std::function& data_cb) { + auto& reg = _os.registry(); + + if (!reg.valid(o)) { + return false; + } + + ObjectHandle oh {reg, o}; + + if (!oh.all_of()) { + // not a file + return false; + } + + const auto& obj_path = oh.get().path; + + // TODO: check if metadata dirty? + // TODO: what if file changed on disk? + + std::cout << "FS: loading fragment '" << obj_path << "'\n"; + + Compression data_comp = Compression::NONE; + if (oh.all_of()) { + data_comp = oh.get().comp; + } + + auto data_file_stack = buildFileStackRead(std::string_view{obj_path}, Encryption::NONE, data_comp); + if (data_file_stack.empty()) { + return false; + } + + // TODO: make it read in a single chunk instead? + static constexpr int64_t chunk_size {1024 * 1024}; // 1MiB should be good for read + do { + auto data_var = data_file_stack.top()->read(chunk_size); + ByteSpan data = spanFromRead(data_var); + + if (data.empty()) { + // error or probably eof + break; + } + + data_cb(data); + + if (data.size < chunk_size) { + // eof + break; + } + } while (data_file_stack.top()->isGood()); + + return true; +} + +void FilesystemStorage::scanAsync(void) { + scanPathAsync(_storage_path); +} + +size_t FilesystemStorage::scanPath(std::string_view path) { + // TODO: extract so async can work (or/and make iteratable generator) + + if (path.empty() || !std::filesystem::is_directory(path)) { + std::cerr << "FS error: scan path not a directory '" << path << "'\n"; + return 0; + } + + // step 1: make snapshot of files, validate metafiles and save id/path+meta.ext + // can be extra thread (if non vfs) + struct ObjFileEntry { + std::string id_str; + std::filesystem::path obj_path; + std::string meta_ext; + + bool operator==(const ObjFileEntry& other) const { + // only compare by id + return id_str == other.id_str; + } + }; + struct ObjFileEntryHash { + size_t operator()(const ObjFileEntry& it) const { + return entt::hashed_string(it.id_str.data(), it.id_str.size()); + } + }; + entt::dense_set file_obj_list; + + std::filesystem::path storage_path{path}; + + auto handle_file = [&](const std::filesystem::path& file_path) { + if (!std::filesystem::is_regular_file(file_path)) { + return; + } + // handle file + + if (file_path.has_extension()) { + // skip over metadata, assuming only metafiles have extentions (might be wrong?) + // also skips temps + return; + } + + auto relative_path = std::filesystem::proximate(file_path, storage_path); + std::string id_str = relative_path.generic_u8string(); + // delete all '/' + id_str.erase(std::remove(id_str.begin(), id_str.end(), '/'), id_str.end()); + if (id_str.size() % 2 != 0) { + std::cerr << "FS error: non hex object uid detected: '" << id_str << "'\n"; + } + + if (file_obj_list.contains(ObjFileEntry{id_str, {}, ""})) { + std::cerr << "FS error: object duplicate detected: '" << id_str << "'\n"; + return; // skip + } + + const char* meta_ext = ".meta.msgpack"; + { // find meta + // TODO: this as to know all possible extentions + bool has_meta_msgpack = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.msgpack"); + bool has_meta_json = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.json"); + const size_t meta_sum = + (has_meta_msgpack?1:0) + + (has_meta_json?1:0) + ; + + if (meta_sum > 1) { // has multiple + std::cerr << "FS error: object with multiple meta files detected: " << id_str << "\n"; + return; // skip + } + + if (meta_sum == 0) { + std::cerr << "FS error: object missing meta file detected: " << id_str << "\n"; + return; // skip + } + + if (has_meta_json) { + meta_ext = ".meta.json"; + } + } + + file_obj_list.emplace(ObjFileEntry{ + std::move(id_str), + file_path, + meta_ext + }); + }; + + for (const auto& outer_path : std::filesystem::directory_iterator(storage_path)) { + if (std::filesystem::is_regular_file(outer_path)) { + handle_file(outer_path); + } else if (std::filesystem::is_directory(outer_path)) { + // subdir, part of id + for (const auto& inner_path : std::filesystem::directory_iterator(outer_path)) { + //if (std::filesystem::is_regular_file(inner_path)) { + + //// handle file + //} // TODO: support deeper recursion? + handle_file(inner_path); + } + } + } + + std::cout << "FS: scan found:\n"; + for (const auto& it : file_obj_list) { + std::cout << " " << it.id_str << "\n"; + } + + // step 2: check if files preexist in reg + // main thread + // (merge into step 3 and do more error checking?) + // TODO: properly handle duplicates, dups form different backends might be legit + // important + for (auto it = file_obj_list.begin(); it != file_obj_list.end();) { + auto id = hex2bin(it->id_str); + auto oh = _os.getOneObjectByID(ByteSpan{id}); + if (static_cast(oh)) { + // pre exising (handle differently??) + // check if store differs? + it = file_obj_list.erase(it); + } else { + it++; + } + } + + auto& sjc = _os.registry().ctx().get>(); + + std::vector scanned_objs; + // step 3: parse meta and insert into reg of non preexising + // main thread + // TODO: check timestamps of preexisting and reload? mark external/remote dirty? + for (const auto& it : file_obj_list) { + MetaFileType mft {MetaFileType::TEXT_JSON}; + Encryption meta_enc {Encryption::NONE}; + Compression meta_comp {Compression::NONE}; + nlohmann::json j; + if (it.meta_ext == ".meta.msgpack") { + std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); + if (!file.is_open()) { + std::cout << "FS error: failed opening meta " << it.obj_path << "\n"; + continue; + } + + mft = MetaFileType::BINARY_MSGPACK; + + // file is a msgpack within a msgpack + + std::vector full_meta_data; + { // read meta file + // figure out size + file.seekg(0, file.end); + uint64_t file_size = file.tellg(); + file.seekg(0, file.beg); + + full_meta_data.resize(file_size); + + file.read(reinterpret_cast(full_meta_data.data()), full_meta_data.size()); + } + + const auto meta_header_j = nlohmann::json::from_msgpack(full_meta_data, true, false); + + if (!meta_header_j.is_array() || meta_header_j.size() < 4) { + std::cerr << "FS error: broken binary meta " << it.obj_path << "\n"; + continue; + } + + if (meta_header_j.at(0) != "SOLMET") { + std::cerr << "FS error: wrong magic '" << meta_header_j.at(0) << "' in meta " << it.obj_path << "\n"; + continue; + } + + meta_enc = meta_header_j.at(1); + if (meta_enc != Encryption::NONE) { + std::cerr << "FS error: unknown encryption " << it.obj_path << "\n"; + continue; + } + + meta_comp = meta_header_j.at(2); + if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) { + std::cerr << "FS error: unknown compression " << it.obj_path << "\n"; + continue; + } + + //const auto& meta_data_ref = meta_header_j.at(3).is_binary()?meta_header_j.at(3):meta_header_j.at(3).at("data"); + if (!meta_header_j.at(3).is_binary()) { + std::cerr << "FS error: meta data not binary " << it.obj_path << "\n"; + continue; + } + const nlohmann::json::binary_t& meta_data_ref = meta_header_j.at(3); + + std::stack> binary_reader_stack; + binary_reader_stack.push(std::make_unique(ByteSpan{meta_data_ref.data(), meta_data_ref.size()})); + + if (!buildStackRead(binary_reader_stack, meta_enc, meta_comp)) { + std::cerr << "FS error: binary reader creation failed " << it.obj_path << "\n"; + continue; + } + + // HACK: read fixed amout of data, but this way if we have neither enc nor comp we pass the span through + auto binary_read_value = binary_reader_stack.top()->read(10*1024*1024); // is 10MiB large enough for meta? + const auto binary_read_span = spanFromRead(binary_read_value); + assert(binary_read_span.size < 10*1024*1024); + + j = nlohmann::json::from_msgpack(binary_read_span, true, false); + } else if (it.meta_ext == ".meta.json") { + std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary); + if (!file.is_open()) { + std::cerr << "FS error: failed opening meta " << it.obj_path << "\n"; + continue; + } + + mft = MetaFileType::TEXT_JSON; + + file >> j; + } else { + assert(false); + } + + if (!j.is_object()) { + std::cerr << "FS error: json in meta is broken " << it.id_str << "\n"; + continue; + } + + // TODO: existing fragment file + //newFragmentFile(); + ObjectHandle oh{_os.registry(), _os.registry().create()}; + oh.emplace(this); + oh.emplace(hex2bin(it.id_str)); + oh.emplace(mft); + oh.emplace(meta_enc); + oh.emplace(meta_comp); + + oh.emplace(it.obj_path.generic_u8string()); + + for (const auto& [k, v] : j.items()) { + // type id from string hash + const auto type_id = entt::hashed_string(k.data(), k.size()); + const auto deserl_fn_it = sjc._deserl.find(type_id); + if (deserl_fn_it != sjc._deserl.cend()) { + // TODO: check return value + deserl_fn_it->second(oh, v); + } else { + std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n"; + } + } + scanned_objs.push_back(oh); + } + + // TODO: mutex and move code to async and return this list ? + + // throw new frag event here, after loading them all + for (const Object o : scanned_objs) { + _os.throwEventConstruct(o); + } + + return scanned_objs.size(); +} + +void FilesystemStorage::scanPathAsync(std::string path) { + // add path to queue + // HACK: if path is known/any fragment is in the path, this operation blocks (non async) + scanPath(path); // TODO: make async and post result +} + +} // backend + diff --git a/src/solanaceae/object_store/backends/filesystem_storage.hpp b/src/solanaceae/object_store/backends/filesystem_storage.hpp new file mode 100644 index 0000000..037a14c --- /dev/null +++ b/src/solanaceae/object_store/backends/filesystem_storage.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include "../types.hpp" +#include "../object_store.hpp" + +#include + +namespace backend { + +struct FilesystemStorage : public StorageBackendI { + FilesystemStorage( + ObjectStore2& os, + std::string_view storage_path = "test_obj_store", + MetaFileType mft_new = MetaFileType::BINARY_MSGPACK + ); + ~FilesystemStorage(void); + + // TODO: fix the path for this specific fs? + // for now we assume a single storage path per backend (there can be multiple per type) + std::string _storage_path; + + // meta file type for new objects + MetaFileType _mft_new {MetaFileType::BINARY_MSGPACK}; + + ObjectHandle newObject(ByteSpan id) override; + + bool write(Object o, std::function& data_cb) override; + bool read(Object o, std::function& data_cb) override; + + //// convenience function + //nlohmann::json loadFromStorageNJ(FragmentID fid); + + void scanAsync(void); + + private: + size_t scanPath(std::string_view path); + void scanPathAsync(std::string path); +}; + +} // backend +