From 0610a6a64a8d14795862047e476537573bc57f53 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 9 Apr 2024 12:09:21 +0200 Subject: [PATCH] continue os refactor, start with fs backend --- src/CMakeLists.txt | 2 + .../backends/filesystem_storage.cpp | 237 ++++++++++++++++++ .../backends/filesystem_storage.hpp | 18 ++ src/fragment_store/object_store.cpp | 16 ++ src/fragment_store/object_store.hpp | 27 ++ 5 files changed, 300 insertions(+) create mode 100644 src/fragment_store/backends/filesystem_storage.cpp create mode 100644 src/fragment_store/backends/filesystem_storage.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f83d2861..76b371e6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -21,6 +21,8 @@ add_library(fragment_store #new ./fragment_store/object_store.hpp ./fragment_store/object_store.cpp + ./fragment_store/backends/filesystem_storage.hpp + ./fragment_store/backends/filesystem_storage.cpp ./json/message_components.hpp # TODO: move ./json/tox_message_components.hpp # TODO: move diff --git a/src/fragment_store/backends/filesystem_storage.cpp b/src/fragment_store/backends/filesystem_storage.cpp new file mode 100644 index 00000000..895cad98 --- /dev/null +++ b/src/fragment_store/backends/filesystem_storage.cpp @@ -0,0 +1,237 @@ +#include "./filesystem_storage.hpp" + +#include "../meta_components.hpp" + +#include + +#include +#include + +#include "../file2_stack.hpp" + +#include + +#include + +static const char* metaFileTypeSuffix(MetaFileType mft) { + switch (mft) { + case MetaFileType::TEXT_JSON: return ".json"; + //case MetaFileType::BINARY_ARB: return ".bin"; + case MetaFileType::BINARY_MSGPACK: return ".msgpack"; + } + return ""; // .unk? +} + +namespace backend { + +FilesystemStorage::FilesystemStorage(ObjectStore2& os) : StorageBackendI::StorageBackendI(os) { +} + +FilesystemStorage::~FilesystemStorage(void) { +} + +bool FilesystemStorage::write(Object o, std::function& data_cb) { + auto& reg = _os.registry(); + + if (!reg.valid(o)) { + return false; + } + + ObjectHandle oh {reg, o}; + + if (!oh.all_of()) { + // not a file fragment? + return false; + } + + // split object storage + + MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults + if (reg.all_of(o)) { + meta_type = oh.get().type; + } + + Encryption meta_enc = Encryption::NONE; // TODO: better defaults + Compression meta_comp = Compression::NONE; // TODO: better defaults + + if (meta_type != MetaFileType::TEXT_JSON) { + if (oh.all_of()) { + meta_enc = oh.get().enc; + } + + if (oh.all_of()) { + meta_comp = oh.get().comp; + } + } else { + // we cant have encryption or compression + // so we force NONE for TEXT JSON + + oh.emplace_or_replace(Encryption::NONE); + oh.emplace_or_replace(Compression::NONE); + } + + std::filesystem::path meta_tmp_path = oh.get().path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp"; + meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string()); + // TODO: make meta comp work with mem compressor + //auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp); + std::stack> meta_file_stack; + meta_file_stack.push(std::make_unique(std::string_view{meta_tmp_path.generic_u8string()})); + + if (meta_file_stack.empty()) { + std::cerr << "FS error: failed to create temporary meta file stack\n"; + std::filesystem::remove(meta_tmp_path); // might have created an empty file + return false; + } + + Encryption data_enc = Encryption::NONE; // TODO: better defaults + Compression data_comp = Compression::NONE; // TODO: better defaults + if (oh.all_of()) { + data_enc = oh.get().enc; + } + if (oh.all_of()) { + data_comp = oh.get().comp; + } + + std::filesystem::path data_tmp_path = oh.get().path + ".tmp"; + data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string()); + auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp); + if (data_file_stack.empty()) { + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } + std::filesystem::remove(meta_tmp_path); + std::cerr << "FS error: failed to create temporary data file stack\n"; + return false; + } + + try { // TODO: properly sanitize strings, so this cant throw + // sharing code between binary msgpack and text json for now + nlohmann::json meta_data_j = nlohmann::json::object(); // metadata needs to be an object, null not allowed + // metadata file + + for (const auto& [type_id, storage] : reg.storage()) { + if (!storage.contains(o)) { + continue; + } + + //std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n"; + + // use type_id to find serializer + auto s_cb_it = _os._sc._serl_json.find(type_id); + if (s_cb_it == _os._sc._serl_json.end()) { + // could not find serializer, not saving + continue; + } + + // noooo, why cant numbers be keys + //if (meta_type == MetaFileType::BINARY_MSGPACK) { // msgpack uses the hash id instead + //s_cb_it->second(storage.value(fid), meta_data[storage.type().hash()]); + //} else if (meta_type == MetaFileType::TEXT_JSON) { + s_cb_it->second(oh, meta_data_j[storage.type().name()]); + //} + } + + if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file + std::vector binary_meta_data; + { + std::stack> binary_writer_stack; + binary_writer_stack.push(std::make_unique(binary_meta_data)); + + if (!buildStackWrite(binary_writer_stack, meta_enc, meta_comp)) { + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } + std::filesystem::remove(meta_tmp_path); + while (!data_file_stack.empty()) { data_file_stack.pop(); } + std::filesystem::remove(data_tmp_path); + std::cerr << "FS error: binary writer creation failed '" << oh.get().path << "'\n"; + return false; + } + + { + const std::vector meta_data = nlohmann::json::to_msgpack(meta_data_j); + if (!binary_writer_stack.top()->write(ByteSpan{meta_data})) { + // i feel like exceptions or refactoring would be nice here + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } + std::filesystem::remove(meta_tmp_path); + while (!data_file_stack.empty()) { data_file_stack.pop(); } + std::filesystem::remove(data_tmp_path); + std::cerr << "FS error: binary writer failed '" << oh.get().path << "'\n"; + return false; + } + } + } + + //// the meta file is itself msgpack data + nlohmann::json meta_header_j = nlohmann::json::array(); + meta_header_j.emplace_back() = "SOLMET"; + meta_header_j.push_back(meta_enc); + meta_header_j.push_back(meta_comp); + + // with a custom msgpack impl like cmp, we can be smarter here and dont need an extra buffer + meta_header_j.push_back(nlohmann::json::binary(binary_meta_data)); + + const auto meta_header_data = nlohmann::json::to_msgpack(meta_header_j); + meta_file_stack.top()->write({meta_header_data.data(), meta_header_data.size()}); + } else if (meta_type == MetaFileType::TEXT_JSON) { + // cant be compressed or encrypted + const auto meta_file_json_str = meta_data_j.dump(2, ' ', true); + meta_file_stack.top()->write({reinterpret_cast(meta_file_json_str.data()), meta_file_json_str.size()}); + } + + } catch (...) { + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? + std::filesystem::remove(meta_tmp_path); + while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? + std::filesystem::remove(data_tmp_path); + std::cerr << "FS error: failed to serialize json data\n"; + return false; + } + + // now data + // for zstd compression, chunk size is frame size. (no cross frame referencing) + // TODO: add buffering steam layer + static constexpr int64_t chunk_size{1024*1024}; // 1MiB should be enough + std::vector buffer(chunk_size); + uint64_t buffer_actual_size {0}; + do { + buffer_actual_size = data_cb(buffer.data(), buffer.size()); + if (buffer_actual_size == 0) { + break; + } + if (buffer_actual_size > buffer.size()) { + // wtf + break; + } + + data_file_stack.top()->write({buffer.data(), buffer_actual_size}); + } while (buffer_actual_size == buffer.size()); + + //meta_file.flush(); + //meta_file.close(); + while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? + //data_file.flush(); + //data_file.close(); + while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? + + std::filesystem::rename( + meta_tmp_path, + oh.get().path + ".meta" + metaFileTypeSuffix(meta_type) + ); + + std::filesystem::rename( + data_tmp_path, + oh.get().path + ); + + // TODO: check return value of renames + + if (oh.all_of()) { + oh.remove(); + } + + return true; +} + +bool FilesystemStorage::read(Object o, std::function& data_cb) { + return false; +} + +} // backend + diff --git a/src/fragment_store/backends/filesystem_storage.hpp b/src/fragment_store/backends/filesystem_storage.hpp new file mode 100644 index 00000000..715bf91d --- /dev/null +++ b/src/fragment_store/backends/filesystem_storage.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include "../object_store.hpp" + +namespace backend { + +struct FilesystemStorage : public StorageBackendI { + FilesystemStorage(ObjectStore2& os); + ~FilesystemStorage(void); + + bool write(Object o, std::function& data_cb) override; + bool read(Object o, std::function& data_cb) override; + + //// convenience function + //nlohmann::json loadFromStorageNJ(FragmentID fid); +}; + +} // backend diff --git a/src/fragment_store/object_store.cpp b/src/fragment_store/object_store.cpp index 0806c618..4fc77860 100644 --- a/src/fragment_store/object_store.cpp +++ b/src/fragment_store/object_store.cpp @@ -6,6 +6,22 @@ #include +StorageBackendI::StorageBackendI(ObjectStore2& os) : _os(os) { +} + +bool StorageBackendI::write(Object o, const ByteSpan data) { + std::function fn_cb = [read = 0ull, data](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t { + uint64_t i = 0; + for (; i+read < data.size && i < buffer_size; i++) { + request_buffer[i] = data[i+read]; + } + read += i; + + return i; + }; + return write(o, fn_cb); +} + static bool serl_json_data_enc_type(const ObjectHandle oh, nlohmann::json& out) { out = static_cast>( oh.get().enc diff --git a/src/fragment_store/object_store.hpp b/src/fragment_store/object_store.hpp index 764a9794..bcfff21f 100644 --- a/src/fragment_store/object_store.hpp +++ b/src/fragment_store/object_store.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include "./serializer.hpp" // TODO: get rid of the tight nljson integration @@ -14,6 +15,28 @@ enum class Object : uint32_t {}; using ObjectRegistry = entt::basic_registry; using ObjectHandle = entt::basic_handle; +// fwd +struct ObjectStore2; + +struct StorageBackendI { + // OR or OS ? + ObjectStore2& _os; + + StorageBackendI(ObjectStore2& os); + + // ========== write object to storage ========== + using write_to_storage_fetch_data_cb = uint64_t(uint8_t* request_buffer, uint64_t buffer_size); + // calls data_cb with a buffer to be filled in, cb returns actual count of data. if returned < max, its the last buffer. + virtual bool write(Object o, std::function& data_cb) = 0; + //virtual bool write(Object o, const uint8_t* data, const uint64_t data_size); // default impl + virtual bool write(Object o, const ByteSpan data); // default impl + + // ========== read object from storage ========== + using read_from_storage_put_data_cb = void(const ByteSpan buffer); + virtual bool read(Object o, std::function& data_cb) = 0; + +}; + namespace ObjectStore::Events { struct ObjectConstruct { @@ -54,12 +77,16 @@ struct ObjectStore2 : public ObjectStoreEventProviderI { SerializerCallbacks _sc; + // TODO: default backend? + ObjectStore2(void); virtual ~ObjectStore2(void); ObjectRegistry& registry(void); ObjectHandle objectHandle(const Object o); + // sync? + void throwEventConstruct(const Object o); void throwEventUpdate(const Object o); void throwEventDestroy(const Object o);