From 267f8dffc18b269f01662b0172b3f07aa17cf31c Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 13 Feb 2024 00:15:18 +0100 Subject: [PATCH] working prototpying code --- external/CMakeLists.txt | 11 +- flake.nix | 2 + src/CMakeLists.txt | 24 ++ src/fragment_store/fragment_store.cpp | 318 ++++++++++++++++++++++++ src/fragment_store/fragment_store.hpp | 171 +++++++++++++ src/fragment_store/fragment_store_i.hpp | 11 + src/fragment_store/test_fragstore.cpp | 82 ++++++ 7 files changed, 618 insertions(+), 1 deletion(-) create mode 100644 src/fragment_store/fragment_store.cpp create mode 100644 src/fragment_store/fragment_store.hpp create mode 100644 src/fragment_store/fragment_store_i.hpp create mode 100644 src/fragment_store/test_fragstore.cpp diff --git a/external/CMakeLists.txt b/external/CMakeLists.txt index 59914aba..0748b3fd 100644 --- a/external/CMakeLists.txt +++ b/external/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.9 FATAL_ERROR) +cmake_minimum_required(VERSION 3.14...3.24 FATAL_ERROR) add_subdirectory(./entt) @@ -19,3 +19,12 @@ add_subdirectory(./stb) add_subdirectory(./libwebp) add_subdirectory(./qoi) +if (NOT TARGET nlohmann_json::nlohmann_json) + FetchContent_Declare(json + URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz + URL_HASH SHA256=d6c65aca6b1ed68e7a182f4757257b107ae403032760ed6ef121c9d55e81757d + EXCLUDE_FROM_ALL + ) + FetchContent_MakeAvailable(json) +endif() + diff --git a/flake.nix b/flake.nix index f957dd0b..9f580c7a 100644 --- a/flake.nix +++ b/flake.nix @@ -58,6 +58,8 @@ cmakeFlags = [ "-DTOMATO_ASAN=OFF" "-DCMAKE_BUILD_TYPE=RelWithDebInfo" + "-DFETCHCONTENT_SOURCE_DIR_JSON=${pkgs.nlohmann_json.src}" # we care less about version here + # do we really care less about the version? do we need a stable abi? ]; # TODO: replace with install command diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6328a18f..375f95ba 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,29 @@ cmake_minimum_required(VERSION 3.9 FATAL_ERROR) +add_library(fragment_store + ./fragment_store/fragment_store_i.hpp + ./fragment_store/fragment_store.hpp + ./fragment_store/fragment_store.cpp +) + +target_link_libraries(fragment_store PUBLIC + nlohmann_json::nlohmann_json + EnTT::EnTT + solanaceae_util +) + +######################################## + +add_executable(fragment_store_test + fragment_store/test_fragstore.cpp +) + +target_link_libraries(fragment_store_test PUBLIC + fragment_store +) + +######################################## + add_executable(tomato ./main.cpp ./icon.rc diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp new file mode 100644 index 00000000..fad12464 --- /dev/null +++ b/src/fragment_store/fragment_store.cpp @@ -0,0 +1,318 @@ +#include "./fragment_store.hpp" + +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +static const char* metaFileTypeSuffix(MetaFileType mft) { + switch (mft) { + case MetaFileType::TEXT_JSON: return ".json"; + //case MetaFileType::BINARY_ARB: return ".bin"; + case MetaFileType::BINARY_MSGPACK: return ".msgpack"; + } + return ""; // .unk? +} + +FragmentStore::FragmentStore(void) { + registerSerializers(); +} + +FragmentStore::FragmentStore( + std::array session_uuid_namespace +) : _session_uuid_namespace(std::move(session_uuid_namespace)) { + registerSerializers(); +} + +entt::basic_handle> FragmentStore::fragmentHandle(FragmentID fid) { + return {_reg, fid}; +} + +FragmentID FragmentStore::newFragmentMemoryOwned( + const std::vector& id, + size_t initial_size +) { + { // first check if id is already used + auto exising_id = getFragmentByID(id); + if (_reg.valid(exising_id)) { + return entt::null; + } + } + + { // next check if space in memory budget + const auto free_memory = _memory_budget - _memory_usage; + if (initial_size > free_memory) { + return entt::null; + } + } + + // actually allocate and create + auto new_data = std::make_unique>(initial_size); + if (!static_cast(new_data)) { + // allocation failure + return entt::null; + } + _memory_usage += initial_size; + + const auto new_frag = _reg.create(); + + _reg.emplace(new_frag, id); + // TODO: memory comp + _reg.emplace>>(new_frag) = std::move(new_data); + + return new_frag; +} + +FragmentID FragmentStore::newFragmentFile( + std::string_view store_path, + MetaFileType mft, + const std::vector& id +) { + { // first check if id is already used + const auto exising_id = getFragmentByID(id); + if (_reg.valid(exising_id)) { + return entt::null; + } + } + + if (store_path.empty()) { + store_path = _default_store_path; + } + + std::filesystem::create_directories(store_path); + + const auto id_hex = bin2hex(id); + std::filesystem::path fragment_file_path; + + if (id_hex.size() < 6) { + fragment_file_path = std::filesystem::path{store_path}/id_hex; + } else { + // use the first 2hex (1byte) as a subfolder + std::filesystem::create_directories(std::string{store_path} + id_hex.substr(0, 2)); + fragment_file_path = std::filesystem::path{std::string{store_path} + id_hex.substr(0, 2)} / id_hex.substr(2); + } + + if (std::filesystem::exists(fragment_file_path)) { + return entt::null; + } + + const auto new_frag = _reg.create(); + + _reg.emplace(new_frag, id); + + // file (info) comp + _reg.emplace(new_frag, fragment_file_path.generic_u8string()); + + _reg.emplace(new_frag, mft); + + // meta needs to be synced to file + std::function empty_data_cb = [](const uint8_t*, uint64_t) -> uint64_t { return 0; }; + if (!syncToStorage(new_frag, empty_data_cb)) { + _reg.destroy(new_frag); + return entt::null; + } + + return new_frag; +} + +FragmentID FragmentStore::getFragmentByID( + const std::vector& id +) { + // TODO: accelerate + // maybe keep it sorted and binary search? hash table lookup? + for (const auto& [frag, id_comp] : _reg.view().each()) { + if (id == id_comp.v) { + return frag; + } + } + + return entt::null; +} + +FragmentID FragmentStore::getFragmentCustomMatcher( + std::function& fn +) { + return entt::null; +} + +template +static void writeBinaryMetafileHeader(F& file, const Encryption enc, const Compression comp) { + file.write("SOLMET", 6); + file.put(static_cast>(enc)); + + // TODO: is compressiontype encrypted? + file.put(static_cast>(comp)); +} + +bool FragmentStore::syncToStorage(FragmentID fid, std::function& data_cb) { + if (!_reg.valid(fid)) { + return false; + } + + if (!_reg.all_of(fid)) { + // not a file fragment? + return false; + } + + // split object storage + + MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults + if (_reg.all_of(fid)) { + meta_type = _reg.get(fid).type; + } + + Encryption meta_enc = Encryption::NONE; // TODO: better defaults + Compression meta_comp = Compression::NONE; // TODO: better defaults + + if (meta_type != MetaFileType::TEXT_JSON) { + if (_reg.all_of(fid)) { + meta_enc = _reg.get(fid).enc; + } + + if (_reg.all_of(fid)) { + meta_comp = _reg.get(fid).comp; + } + } else { + // we cant have encryption or compression + + // TODO: warning/error? + + // TODO: forcing for testing + //if (_reg.all_of(fid)) { + _reg.emplace_or_replace(fid, Encryption::NONE); + //} + //if (_reg.all_of(fid)) { + _reg.emplace_or_replace(fid, Compression::NONE); + //} + } + + std::ofstream meta_file{ + _reg.get(fid).path + ".meta" + metaFileTypeSuffix(meta_type), + std::ios::out | std::ios::trunc | std::ios::binary // always binary, also for text + }; + + if (!meta_file.is_open()) { + return false; + } + + std::ofstream data_file{ + _reg.get(fid).path, + std::ios::out | std::ios::trunc | std::ios::binary // always binary, also for text + }; + + if (!data_file.is_open()) { + return false; + } + + // metadata type + if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file + writeBinaryMetafileHeader(meta_file, meta_enc, meta_comp); + } + + // sharing code between binary msgpack and text json for now + nlohmann::json meta_data = nlohmann::json::object(); // metadata needs to be an object, null not allowed + // metadata file + + for (const auto& [type_id, storage] : _reg.storage()) { + if (!storage.contains(fid)) { + continue; + } + + std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n"; + + // use type_id to find serializer + auto s_cb_it = _sc._serl_json.find(type_id); + if (s_cb_it == _sc._serl_json.end()) { + // could not find serializer, not saving + continue; + } + + // noooo, why cant numbers be keys + //if (meta_type == MetaFileType::BINARY_MSGPACK) { // msgpack uses the hash id instead + //s_cb_it->second(storage.value(fid), meta_data[storage.type().hash()]); + //} else if (meta_type == MetaFileType::TEXT_JSON) { + s_cb_it->second(storage.value(fid), meta_data[storage.type().name()]); + //} + } + + if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file + const auto res = nlohmann::json::to_msgpack(meta_data); + meta_file.write(reinterpret_cast(res.data()), res.size()); + } else if (meta_type == MetaFileType::TEXT_JSON) { + meta_file << meta_data.dump(2, ' ', true); + } + + // now data + std::array buffer; + uint64_t buffer_actual_size {0}; + do { + buffer_actual_size = data_cb(buffer.data(), buffer.size()); + if (buffer_actual_size == 0) { + break; + } + if (buffer_actual_size > buffer.size()) { + // wtf + break; + } + + data_file.write(reinterpret_cast(buffer.data()), buffer_actual_size); + } while (buffer_actual_size == buffer.size()); + + meta_file.flush(); + data_file.flush(); + + // TODO: use temp files and move to old location + + if (_reg.all_of(fid)) { + _reg.remove(fid); + } + + return true; +} + +static bool serl_json_data_enc_type(void* comp, nlohmann::json& out) { + if (comp == nullptr) { + return false; + } + + auto& r_comp = *reinterpret_cast(comp); + + out = static_cast>(r_comp.enc); + + return true; +} + +static bool serl_json_data_comp_type(void* comp, nlohmann::json& out) { + if (comp == nullptr) { + return false; + } + + auto& r_comp = *reinterpret_cast(comp); + + out = static_cast>(r_comp.comp); + + return true; +} + +void FragmentStore::registerSerializers(void) { + _sc.registerSerializerJson(serl_json_data_enc_type); + _sc.registerSerializerJson(serl_json_data_comp_type); + + std::cout << "registered serl text json cbs:\n"; + for (const auto& [type_id, _] : _sc._serl_json) { + std::cout << " " << type_id << "\n"; + } +} + diff --git a/src/fragment_store/fragment_store.hpp b/src/fragment_store/fragment_store.hpp new file mode 100644 index 00000000..58b3aef3 --- /dev/null +++ b/src/fragment_store/fragment_store.hpp @@ -0,0 +1,171 @@ +#pragma once + +#include "./fragment_store_i.hpp" +#include "entt/entity/fwd.hpp" + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +enum class Encryption : uint8_t { + NONE = 0x00, +}; +enum class Compression : uint8_t { + NONE = 0x00, +}; +enum class MetaFileType : uint8_t { + TEXT_JSON, + //BINARY_ARB, + BINARY_MSGPACK, +}; + +namespace Components { + + // TODO: is this special and should this be saved to meta or not (its already in the file name on disk) + struct ID { + std::vector v; + }; + + struct DataEncryptionType { + Encryption enc {Encryption::NONE}; + }; + + struct DataCompressionType { + Compression comp {Compression::NONE}; + }; + + + // meta that is not written to (meta-)file + namespace Ephemeral { + + // excluded from file meta + struct FilePath { + // contains store path, if any + std::string path; + }; + + // TODO: seperate into remote and local? + // (remote meaning eg. the file on disk was changed by another program) + struct DirtyTag {}; + + + // type as comp + struct MetaFileType { + ::MetaFileType type {::MetaFileType::TEXT_JSON}; + }; + + struct MetaEncryptionType { + Encryption enc {Encryption::NONE}; + }; + + struct MetaCompressionType { + Compression comp {Compression::NONE}; + }; + + } // Ephemeral + +} // Components + +struct SerializerCallbacks { + // nlohmann + // json/msgpack + using serialize_json_fn = bool(*)(void* comp, nlohmann::json& out); + entt::dense_map _serl_json; + + using deserialize_json_fn = bool(*)(void* comp, const nlohmann::json& in); + entt::dense_map _deserl_json; + + void registerSerializerJson(serialize_json_fn fn, const entt::type_info& type_info) { + _serl_json[type_info.hash()] = fn; + } + template + void registerSerializerJson(serialize_json_fn fn, const entt::type_info& type_info = entt::type_id()) { registerSerializerJson(fn, type_info); } + + void registerDeSerializerJson(deserialize_json_fn fn, const entt::type_info& type_info) { + _deserl_json[type_info.hash()] = fn; + } + template + void registerDeSerializerJson(deserialize_json_fn fn, const entt::type_info& type_info = entt::type_id()) { registerDeSerializerJson(fn, type_info); } +}; + +struct FragmentStore : public FragmentStoreI { + entt::basic_registry _reg; + + std::array _session_uuid_namespace; + + std::string _default_store_path; + + uint64_t _memory_budget {10u*1024u*1024u}; + uint64_t _memory_usage {0u}; + + SerializerCallbacks _sc; + + FragmentStore(void); + FragmentStore(std::array session_uuid_namespace); + + // HACK: get access to the reg + entt::basic_handle> fragmentHandle(FragmentID fid); + + // TODO: make the frags ref counted + + // ========== new fragment ========== + + // memory backed owned + FragmentID newFragmentMemoryOwned( + const std::vector& id, + size_t initial_size + ); + + // memory backed view (can only be added? not new?) + + // file backed (rw...) + // needs to know which store path to put into + FragmentID newFragmentFile( + std::string_view store_path, + MetaFileType mft, + const std::vector& id + ); + // this variant generate a new, mostly unique, id for us + FragmentID newFragmentFile( + std::string_view store_path, + MetaFileType mft + ); + + // ========== add fragment ========== + + // ========== get fragment ========== + FragmentID getFragmentByID( + const std::vector& id + ); + FragmentID getFragmentCustomMatcher( + std::function& fn + ); + + // remove fragment? + + // syncs fragment to file + + using write_to_storage_fetch_data_cb = uint64_t(uint8_t* request_buffer, uint64_t buffer_size); + // calls data_cb with a buffer to be filled in, cb returns actual count of data. if returned < max, its the last buffer. + bool syncToStorage(FragmentID fid, std::function& data_cb); + + // unload frags? + // if frags are file backed, we can close the file if not needed + + // fragment discovery? + + private: + void registerSerializers(void); // internal comps + // internal actual backends + bool syncToMemory(FragmentID fid, std::function& data_cb); + bool syncToFile(FragmentID fid, std::function& data_cb); +}; + diff --git a/src/fragment_store/fragment_store_i.hpp b/src/fragment_store/fragment_store_i.hpp new file mode 100644 index 00000000..3a77f4c9 --- /dev/null +++ b/src/fragment_store/fragment_store_i.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include + +// internal id +enum class FragmentID : uint32_t {}; + +struct FragmentStoreI { + virtual ~FragmentStoreI(void) {} +}; + diff --git a/src/fragment_store/test_fragstore.cpp b/src/fragment_store/test_fragstore.cpp new file mode 100644 index 00000000..898b3973 --- /dev/null +++ b/src/fragment_store/test_fragstore.cpp @@ -0,0 +1,82 @@ +#include +#include + +#include "./fragment_store.hpp" + +#include +#include + +namespace Components { + struct MessagesTimestampRange { + uint64_t begin {0}; + uint64_t end {1000}; + }; +} // Components + + +static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) { + if (comp == nullptr) { + return false; + } + + out = nlohmann::json::object(); + + auto& r_comp = *reinterpret_cast(comp); + + out["begin"] = r_comp.begin; + out["end"] = r_comp.end; + + return true; +} + +int main(void) { + FragmentStore fs; + fs._default_store_path = "test_store/"; + fs._sc.registerSerializerJson(serl_json_msg_ts_range); + + const auto frag0 = fs.newFragmentFile("", MetaFileType::TEXT_JSON, {0xff, 0xf1, 0xf2, 0xf0, 0xff, 0xff, 0xff, 0xf9}); + + const auto frag1 = fs.newFragmentFile("", MetaFileType::BINARY_MSGPACK, {0xff, 0xff, 0xf0, 0xf0, 0xf0, 0xf0, 0xff, 0xf9}); + + { + auto frag0h = fs.fragmentHandle(frag0); + + frag0h.emplace_or_replace(); + frag0h.emplace_or_replace(); + frag0h.emplace_or_replace(); + + std::function fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t { + uint64_t i = 0; + for (; i+read < 3000 && i < buffer_size; i++) { + request_buffer[i] = uint8_t((i+read) & 0xff); + } + read += i; + + return i; + }; + fs.syncToStorage(frag0, fn_cb); + } + + { + auto frag1h = fs.fragmentHandle(frag1); + + frag1h.emplace_or_replace(); + frag1h.emplace_or_replace(); + //frag1h.emplace_or_replace(MetaFileType::BINARY_MSGPACK); + + std::function fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t { + static constexpr std::string_view text = "This is some random data"; + uint64_t i = 0; + for (; i+read < text.size() && i < buffer_size; i++) { + request_buffer[i] = text[i+read]; + } + read += i; + + return i; + }; + fs.syncToStorage(frag1, fn_cb); + } + + return 0; +} +