loading fragments mostly working (not notifying anyone yet)

This commit is contained in:
Green Sky 2024-02-15 15:06:51 +01:00
parent d278391528
commit 0b0245d844
No known key found for this signature in database
5 changed files with 241 additions and 5 deletions

View File

@ -3,6 +3,8 @@
#include <solanaceae/util/utils.hpp> #include <solanaceae/util/utils.hpp>
#include <entt/entity/handle.hpp> #include <entt/entity/handle.hpp>
#include <entt/container/dense_set.hpp>
#include <entt/core/hashed_string.hpp>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
@ -13,6 +15,7 @@
#include <mutex> #include <mutex>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#include <algorithm>
#include <iostream> #include <iostream>
#include <vector> #include <vector>
@ -51,7 +54,7 @@ FragmentStore::FragmentStore(
registerSerializers(); registerSerializers();
} }
FragmentStore::FragmentHandle FragmentStore::fragmentHandle(FragmentID fid) { FragmentHandle FragmentStore::fragmentHandle(FragmentID fid) {
return {_reg, fid}; return {_reg, fid};
} }
@ -340,6 +343,184 @@ bool FragmentStore::syncToStorage(FragmentID fid, const uint8_t* data, const uin
return syncToStorage(fid, fn_cb); return syncToStorage(fid, fn_cb);
} }
size_t FragmentStore::scanStoragePath(std::string_view path) {
if (path.empty()) {
path = _default_store_path;
}
// TODO: extract so async can work (or/and make iteratable generator)
if (!std::filesystem::is_directory(path)) {
std::cerr << "FS error: scan path not a directory '" << path << "'\n";
return 0;
}
// step 1: make snapshot of files, validate metafiles and save id/path+meta.ext
// can be extra thread (if non vfs)
struct FragFileEntry {
std::string id_str;
std::filesystem::path frag_path;
std::string meta_ext;
bool operator==(const FragFileEntry& other) const {
// only compare by id
return id_str == other.id_str;
}
};
struct FragFileEntryHash {
size_t operator()(const FragFileEntry& it) const {
return entt::hashed_string(it.id_str.data(), it.id_str.size());
}
};
entt::dense_set<FragFileEntry, FragFileEntryHash> file_frag_list;
std::filesystem::path storage_path{path};
auto handle_file = [&](const std::filesystem::path& file_path) {
if (!std::filesystem::is_regular_file(file_path)) {
return;
}
// handle file
if (file_path.has_extension()) {
// skip over metadata, assuming only metafiles have extentions (might be wrong?)
// also skips temps
return;
}
auto relative_path = std::filesystem::proximate(file_path, storage_path);
std::string id_str = relative_path.generic_u8string();
// delete all '/'
id_str.erase(std::remove(id_str.begin(), id_str.end(), '/'), id_str.end());
if (id_str.size() % 2 != 0) {
std::cerr << "FS error: non hex fragment uid detected: '" << id_str << "'\n";
}
if (file_frag_list.contains(FragFileEntry{id_str, {}, ""})) {
std::cerr << "FS error: fragment duplicate detected: '" << id_str << "'\n";
return; // skip
}
const char* meta_ext = ".meta.msgpack";
{ // find meta
// TODO: this as to know all possible extentions
bool has_meta_msgpack = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.msgpack");
bool has_meta_json = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.json");
const size_t meta_sum =
(has_meta_msgpack?1:0) +
(has_meta_json?1:0)
;
if (meta_sum > 1) { // has multiple
std::cerr << "FS error: fragment with multiple meta files detected: " << id_str << "\n";
return; // skip
}
if (meta_sum == 0) {
std::cerr << "FS error: fragment missing meta file detected: " << id_str << "\n";
return; // skip
}
if (has_meta_json) {
meta_ext = ".meta.json";
}
}
file_frag_list.emplace(FragFileEntry{
std::move(id_str),
file_path,
meta_ext
});
};
for (const auto& outer_path : std::filesystem::directory_iterator(storage_path)) {
if (std::filesystem::is_regular_file(outer_path)) {
handle_file(outer_path);
} else if (std::filesystem::is_directory(outer_path)) {
// subdir, part of id
for (const auto& inner_path : std::filesystem::directory_iterator(outer_path)) {
//if (std::filesystem::is_regular_file(inner_path)) {
//// handle file
//} // TODO: support deeper recursion?
handle_file(inner_path);
}
}
}
std::cout << "FS: scan found:\n";
for (const auto& it : file_frag_list) {
std::cout << " " << it.id_str << "\n";
}
// step 2: check if files preexist in reg
// main thread
// (merge into step 3 and do more error checking?)
for (auto it = file_frag_list.begin(); it != file_frag_list.end();) {
auto id = hex2bin(it->id_str);
auto fid = getFragmentByID(id);
if (_reg.valid(fid)) {
// pre exising (handle differently??)
// check if store differs?
it = file_frag_list.erase(it);
} else {
it++;
}
}
size_t count {0};
// step 3: parse meta and insert into reg of non preexising
// main thread
// TODO: check timestamps of preexisting and reload? mark external/remote dirty?
for (const auto& it : file_frag_list) {
nlohmann::json j;
if (it.meta_ext == ".meta.msgpack") {
// uh
// read binary header
} else if (it.meta_ext == ".meta.json") {
std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext);
if (!file.is_open()) {
std::cout << "FS error: failed opening meta " << it.frag_path << "\n";
continue;
}
file >> j;
if (!j.is_object()) {
std::cerr << "FS error: json in meta is broken " << it.id_str << "\n";
continue;
}
// TODO: existing fragment file
//newFragmentFile();
FragmentHandle fh{_reg, _reg.create()};
fh.emplace<FragComp::ID>(hex2bin(it.id_str));
for (const auto& [k, v] : j.items()) {
// type id from string hash
const auto type_id = entt::hashed_string(k.data(), k.size());
const auto deserl_fn_it = _sc._deserl_json.find(type_id);
if (deserl_fn_it != _sc._deserl_json.cend()) {
// TODO: check return value
deserl_fn_it->second(fh, v);
} else {
std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n";
}
}
count++;
} else {
assert(false);
}
}
return count;
}
void FragmentStore::scanStoragePathAsync(std::string path) {
// add path to queue
// HACK: if path is known/any fragment is in the path, this operation blocks (non async)
scanStoragePath(path); // TODO: make async and post result
}
static bool serl_json_data_enc_type(void* comp, nlohmann::json& out) { static bool serl_json_data_enc_type(void* comp, nlohmann::json& out) {
if (comp == nullptr) { if (comp == nullptr) {
return false; return false;
@ -352,6 +533,20 @@ static bool serl_json_data_enc_type(void* comp, nlohmann::json& out) {
return true; return true;
} }
static bool deserl_json_data_enc_type(FragmentHandle fh, const nlohmann::json& in) {
// TODO: this is ugly in multiple places
try {
fh.emplace_or_replace<FragComp::DataEncryptionType>(
static_cast<Encryption>(
static_cast<std::underlying_type_t<Encryption>>(in)
)
);
} catch(...) {
return false;
}
return true;
}
static bool serl_json_data_comp_type(void* comp, nlohmann::json& out) { static bool serl_json_data_comp_type(void* comp, nlohmann::json& out) {
if (comp == nullptr) { if (comp == nullptr) {
return false; return false;
@ -366,6 +561,7 @@ static bool serl_json_data_comp_type(void* comp, nlohmann::json& out) {
void FragmentStore::registerSerializers(void) { void FragmentStore::registerSerializers(void) {
_sc.registerSerializerJson<FragComp::DataEncryptionType>(serl_json_data_enc_type); _sc.registerSerializerJson<FragComp::DataEncryptionType>(serl_json_data_enc_type);
_sc.registerDeSerializerJson<FragComp::DataEncryptionType>(deserl_json_data_enc_type);
_sc.registerSerializerJson<FragComp::DataCompressionType>(serl_json_data_comp_type); _sc.registerSerializerJson<FragComp::DataCompressionType>(serl_json_data_comp_type);
std::cout << "registered serl text json cbs:\n"; std::cout << "registered serl text json cbs:\n";

View File

@ -17,10 +17,11 @@
#include <cstdint> #include <cstdint>
#include <random> #include <random>
struct FragmentStore : public FragmentStoreI { // fwd
using FragmentHandle = entt::basic_handle<entt::basic_registry<FragmentID>>; struct SerializerCallbacks;
entt::basic_registry<FragmentID> _reg; struct FragmentStore : public FragmentStoreI {
FragmentRegistry _reg;
std::minstd_rand _rng{std::random_device{}()}; std::minstd_rand _rng{std::random_device{}()};
std::array<uint8_t, 8> _session_uuid_namespace; std::array<uint8_t, 8> _session_uuid_namespace;
@ -87,6 +88,9 @@ struct FragmentStore : public FragmentStoreI {
bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size); bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size);
// fragment discovery? // fragment discovery?
// returns number of new fragments
size_t scanStoragePath(std::string_view path);
void scanStoragePathAsync(std::string path);
private: private:
void registerSerializers(void); // internal comps void registerSerializers(void); // internal comps

View File

@ -1,9 +1,13 @@
#pragma once #pragma once
#include <entt/entity/registry.hpp>
#include <cstdint> #include <cstdint>
// internal id // internal id
enum class FragmentID : uint32_t {}; enum class FragmentID : uint32_t {};
using FragmentRegistry = entt::basic_registry<FragmentID>;
using FragmentHandle = entt::basic_handle<FragmentRegistry>;
struct FragmentStoreI { struct FragmentStoreI {
virtual ~FragmentStoreI(void) {} virtual ~FragmentStoreI(void) {}

View File

@ -64,6 +64,19 @@ static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) {
return true; return true;
} }
static bool deserl_json_msg_ts_range(FragmentHandle fh, const nlohmann::json& in) {
// TODO: this is ugly in multiple places
try {
fh.emplace_or_replace<FragComp::MessagesTSRange>(FragComp::MessagesTSRange{
in["begin"],
in["end"]
});
} catch(...) {
return false;
}
return true;
}
static bool serl_json_msg_c_id(void* comp, nlohmann::json& out) { static bool serl_json_msg_c_id(void* comp, nlohmann::json& out) {
if (comp == nullptr) { if (comp == nullptr) {
return false; return false;
@ -78,6 +91,18 @@ static bool serl_json_msg_c_id(void* comp, nlohmann::json& out) {
return true; return true;
} }
static bool deserl_json_msg_c_id(FragmentHandle fh, const nlohmann::json& in) {
// TODO: this is ugly in multiple places
try {
fh.emplace_or_replace<FragComp::MessagesContact>(FragComp::MessagesContact{
in["id"]
});
} catch(...) {
return false;
}
return true;
}
void MessageFragmentStore::handleMessage(const Message3Handle& m) { void MessageFragmentStore::handleMessage(const Message3Handle& m) {
if (!static_cast<bool>(m)) { if (!static_cast<bool>(m)) {
return; // huh? return; // huh?
@ -210,7 +235,9 @@ MessageFragmentStore::MessageFragmentStore(
_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); _rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
_fs._sc.registerSerializerJson<FragComp::MessagesTSRange>(serl_json_msg_ts_range); _fs._sc.registerSerializerJson<FragComp::MessagesTSRange>(serl_json_msg_ts_range);
_fs._sc.registerDeSerializerJson<FragComp::MessagesTSRange>(deserl_json_msg_ts_range);
_fs._sc.registerSerializerJson<FragComp::MessagesContact>(serl_json_msg_c_id); _fs._sc.registerSerializerJson<FragComp::MessagesContact>(serl_json_msg_c_id);
_fs._sc.registerDeSerializerJson<FragComp::MessagesContact>(deserl_json_msg_c_id);
_sc.registerSerializerJson<Message::Components::Timestamp>(serl_json_default<Message::Components::Timestamp>); _sc.registerSerializerJson<Message::Components::Timestamp>(serl_json_default<Message::Components::Timestamp>);
_sc.registerSerializerJson<Message::Components::TimestampProcessed>(serl_json_default<Message::Components::TimestampProcessed>); _sc.registerSerializerJson<Message::Components::TimestampProcessed>(serl_json_default<Message::Components::TimestampProcessed>);
@ -227,9 +254,12 @@ MessageFragmentStore::MessageFragmentStore(
_sc.registerSerializerJson<Message::Components::Transfer::FileInfo>(serl_json_default<Message::Components::Transfer::FileInfo>); _sc.registerSerializerJson<Message::Components::Transfer::FileInfo>(serl_json_default<Message::Components::Transfer::FileInfo>);
_sc.registerSerializerJson<Message::Components::Transfer::FileInfoLocal>(serl_json_default<Message::Components::Transfer::FileInfoLocal>); _sc.registerSerializerJson<Message::Components::Transfer::FileInfoLocal>(serl_json_default<Message::Components::Transfer::FileInfoLocal>);
_sc.registerSerializerJson<Message::Components::Transfer::TagHaveAll>(serl_json_default<Message::Components::Transfer::TagHaveAll>); _sc.registerSerializerJson<Message::Components::Transfer::TagHaveAll>(serl_json_default<Message::Components::Transfer::TagHaveAll>);
_fs.scanStoragePath("test_message_store/");
} }
MessageFragmentStore::~MessageFragmentStore(void) { MessageFragmentStore::~MessageFragmentStore(void) {
// TODO: sync all dirty fragments
} }
float MessageFragmentStore::tick(float time_delta) { float MessageFragmentStore::tick(float time_delta) {

View File

@ -5,13 +5,15 @@
#include <nlohmann/json_fwd.hpp> #include <nlohmann/json_fwd.hpp>
#include "./fragment_store_i.hpp"
struct SerializerCallbacks { struct SerializerCallbacks {
// nlohmann // nlohmann
// json/msgpack // json/msgpack
using serialize_json_fn = bool(*)(void* comp, nlohmann::json& out); using serialize_json_fn = bool(*)(void* comp, nlohmann::json& out);
entt::dense_map<entt::id_type, serialize_json_fn> _serl_json; entt::dense_map<entt::id_type, serialize_json_fn> _serl_json;
using deserialize_json_fn = bool(*)(void* comp, const nlohmann::json& in); using deserialize_json_fn = bool(*)(FragmentHandle fh, const nlohmann::json& in);
entt::dense_map<entt::id_type, deserialize_json_fn> _deserl_json; entt::dense_map<entt::id_type, deserialize_json_fn> _deserl_json;
void registerSerializerJson(serialize_json_fn fn, const entt::type_info& type_info); void registerSerializerJson(serialize_json_fn fn, const entt::type_info& type_info);