move mfs to os, works, convert tool still incomplete

This commit is contained in:
Green Sky 2024-04-11 11:54:00 +02:00
parent 73180195fe
commit a9f6a5d763
No known key found for this signature in database
11 changed files with 121 additions and 1198 deletions

View File

@ -12,13 +12,6 @@ add_library(fragment_store
./fragment_store/meta_components_id.inl
./fragment_store/file2_stack.hpp
./fragment_store/file2_stack.cpp
#old
./fragment_store/serializer.hpp
./fragment_store/fragment_store_i.hpp
./fragment_store/fragment_store_i.cpp
./fragment_store/fragment_store.hpp
./fragment_store/fragment_store.cpp
#new
./fragment_store/serializer_json.hpp
./fragment_store/object_store.hpp
./fragment_store/object_store.cpp
@ -71,16 +64,6 @@ target_link_libraries(message_fragment_store PUBLIC
########################################
add_executable(test_fragment_store
fragment_store/test_fragstore.cpp
)
target_link_libraries(test_fragment_store PUBLIC
fragment_store
)
########################################
add_executable(convert_frag_to_obj
fragment_store/convert_frag_to_obj.cpp
)

View File

@ -1,745 +0,0 @@
#include "./fragment_store.hpp"
#include <solanaceae/util/utils.hpp>
#include <entt/entity/handle.hpp>
#include <entt/container/dense_set.hpp>
#include <entt/core/hashed_string.hpp>
#include <nlohmann/json.hpp>
#include <solanaceae/file/file2_std.hpp>
#include <solanaceae/file/file2_mem.hpp>
#include "./file2_stack.hpp"
#include <cstdint>
#include <fstream>
#include <filesystem>
#include <memory>
#include <mutex>
#include <type_traits>
#include <utility>
#include <algorithm>
#include <stack>
#include <string_view>
#include <vector>
#include <variant>
#include <iostream>
static const char* metaFileTypeSuffix(MetaFileType mft) {
switch (mft) {
case MetaFileType::TEXT_JSON: return ".json";
//case MetaFileType::BINARY_ARB: return ".bin";
case MetaFileType::BINARY_MSGPACK: return ".msgpack";
}
return ""; // .unk?
}
// TODO: move to ... somewhere. (span? file2i?)
static ByteSpan spanFromRead(const std::variant<ByteSpan, std::vector<uint8_t>>& data_var) {
if (std::holds_alternative<std::vector<uint8_t>>(data_var)) {
auto& vec = std::get<std::vector<uint8_t>>(data_var);
return {vec.data(), vec.size()};
} else if (std::holds_alternative<ByteSpan>(data_var)) {
return std::get<ByteSpan>(data_var);
} else {
assert(false);
return {};
}
}
FragmentStore::FragmentStore(void) {
registerSerializers();
}
FragmentStore::FragmentStore(
std::array<uint8_t, 16> session_uuid_namespace
) : _session_uuid_gen(std::move(session_uuid_namespace)) {
registerSerializers();
}
std::vector<uint8_t> FragmentStore::generateNewUID(void) {
return _session_uuid_gen();
}
FragmentID FragmentStore::newFragmentMemoryOwned(
const std::vector<uint8_t>& id,
size_t initial_size
) {
{ // first check if id is already used
auto exising_id = getFragmentByID(id);
if (_reg.valid(exising_id)) {
return entt::null;
}
}
{ // next check if space in memory budget
const auto free_memory = _memory_budget - _memory_usage;
if (initial_size > free_memory) {
return entt::null;
}
}
// actually allocate and create
auto new_data = std::make_unique<std::vector<uint8_t>>(initial_size);
if (!static_cast<bool>(new_data)) {
// allocation failure
return entt::null;
}
_memory_usage += initial_size;
const auto new_frag = _reg.create();
_reg.emplace<FragComp::ID>(new_frag, id);
// TODO: memory comp
_reg.emplace<std::unique_ptr<std::vector<uint8_t>>>(new_frag) = std::move(new_data);
throwEventConstruct(new_frag);
return new_frag;
}
FragmentID FragmentStore::newFragmentFile(
std::string_view store_path,
MetaFileType mft,
const std::vector<uint8_t>& id
) {
{ // first check if id is already used
const auto exising_id = getFragmentByID(id);
if (_reg.valid(exising_id)) {
return entt::null;
}
}
if (store_path.empty()) {
store_path = _default_store_path;
}
std::filesystem::create_directories(store_path);
const auto id_hex = bin2hex(id);
std::filesystem::path fragment_file_path;
if (id_hex.size() < 6) {
fragment_file_path = std::filesystem::path{store_path}/id_hex;
} else {
// use the first 2hex (1byte) as a subfolder
std::filesystem::create_directories(std::string{store_path} + id_hex.substr(0, 2));
fragment_file_path = std::filesystem::path{std::string{store_path} + id_hex.substr(0, 2)} / id_hex.substr(2);
}
if (std::filesystem::exists(fragment_file_path)) {
return entt::null;
}
const auto new_frag = _reg.create();
_reg.emplace<FragComp::ID>(new_frag, id);
// file (info) comp
_reg.emplace<ObjComp::Ephemeral::FilePath>(new_frag, fragment_file_path.generic_u8string());
_reg.emplace<ObjComp::Ephemeral::MetaFileType>(new_frag, mft);
// meta needs to be synced to file
std::function<write_to_storage_fetch_data_cb> empty_data_cb = [](auto*, auto) -> uint64_t { return 0; };
if (!syncToStorage(new_frag, empty_data_cb)) {
std::cerr << "FS error: syncToStorage failed while creating new fragment file\n";
_reg.destroy(new_frag);
return entt::null;
}
// while new metadata might be created here, making sure the file could be created is more important
throwEventConstruct(new_frag);
return new_frag;
}
FragmentID FragmentStore::newFragmentFile(
std::string_view store_path,
MetaFileType mft
) {
return newFragmentFile(store_path, mft, generateNewUID());
}
FragmentID FragmentStore::getFragmentByID(
const std::vector<uint8_t>& id
) {
// TODO: accelerate
// maybe keep it sorted and binary search? hash table lookup?
for (const auto& [frag, id_comp] : _reg.view<FragComp::ID>().each()) {
if (id == id_comp.v) {
return frag;
}
}
return entt::null;
}
FragmentID FragmentStore::getFragmentCustomMatcher(
std::function<bool(FragmentID)>& fn
) {
return entt::null;
}
bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage_fetch_data_cb>& data_cb) {
if (!_reg.valid(fid)) {
return false;
}
if (!_reg.all_of<ObjComp::Ephemeral::FilePath>(fid)) {
// not a file fragment?
return false;
}
// split object storage
MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults
if (_reg.all_of<ObjComp::Ephemeral::MetaFileType>(fid)) {
meta_type = _reg.get<ObjComp::Ephemeral::MetaFileType>(fid).type;
}
Encryption meta_enc = Encryption::NONE; // TODO: better defaults
Compression meta_comp = Compression::NONE; // TODO: better defaults
if (meta_type != MetaFileType::TEXT_JSON) {
if (_reg.all_of<ObjComp::Ephemeral::MetaEncryptionType>(fid)) {
meta_enc = _reg.get<ObjComp::Ephemeral::MetaEncryptionType>(fid).enc;
}
if (_reg.all_of<ObjComp::Ephemeral::MetaCompressionType>(fid)) {
meta_comp = _reg.get<ObjComp::Ephemeral::MetaCompressionType>(fid).comp;
}
} else {
// we cant have encryption or compression
// so we force NONE for TEXT JSON
_reg.emplace_or_replace<ObjComp::Ephemeral::MetaEncryptionType>(fid, Encryption::NONE);
_reg.emplace_or_replace<ObjComp::Ephemeral::MetaCompressionType>(fid, Compression::NONE);
}
std::filesystem::path meta_tmp_path = _reg.get<ObjComp::Ephemeral::FilePath>(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp";
meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string());
// TODO: make meta comp work with mem compressor
//auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp);
std::stack<std::unique_ptr<File2I>> meta_file_stack;
meta_file_stack.push(std::make_unique<File2WFile>(std::string_view{meta_tmp_path.generic_u8string()}));
if (meta_file_stack.empty()) {
std::cerr << "FS error: failed to create temporary meta file stack\n";
std::filesystem::remove(meta_tmp_path); // might have created an empty file
return false;
}
Encryption data_enc = Encryption::NONE; // TODO: better defaults
Compression data_comp = Compression::NONE; // TODO: better defaults
if (_reg.all_of<FragComp::DataEncryptionType>(fid)) {
data_enc = _reg.get<FragComp::DataEncryptionType>(fid).enc;
}
if (_reg.all_of<FragComp::DataCompressionType>(fid)) {
data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp;
}
std::filesystem::path data_tmp_path = _reg.get<ObjComp::Ephemeral::FilePath>(fid).path + ".tmp";
data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string());
auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp);
if (data_file_stack.empty()) {
while (!meta_file_stack.empty()) { meta_file_stack.pop(); }
std::filesystem::remove(meta_tmp_path);
std::cerr << "FS error: failed to create temporary data file stack\n";
return false;
}
try { // TODO: properly sanitize strings, so this cant throw
// sharing code between binary msgpack and text json for now
nlohmann::json meta_data_j = nlohmann::json::object(); // metadata needs to be an object, null not allowed
// metadata file
for (const auto& [type_id, storage] : _reg.storage()) {
if (!storage.contains(fid)) {
continue;
}
//std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n";
// use type_id to find serializer
auto s_cb_it = _sc._serl_json.find(type_id);
if (s_cb_it == _sc._serl_json.end()) {
// could not find serializer, not saving
continue;
}
// noooo, why cant numbers be keys
//if (meta_type == MetaFileType::BINARY_MSGPACK) { // msgpack uses the hash id instead
//s_cb_it->second(storage.value(fid), meta_data[storage.type().hash()]);
//} else if (meta_type == MetaFileType::TEXT_JSON) {
s_cb_it->second({_reg, fid}, meta_data_j[storage.type().name()]);
//}
}
if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file
std::vector<uint8_t> binary_meta_data;
{
std::stack<std::unique_ptr<File2I>> binary_writer_stack;
binary_writer_stack.push(std::make_unique<File2MemW>(binary_meta_data));
if (!buildStackWrite(binary_writer_stack, meta_enc, meta_comp)) {
while (!meta_file_stack.empty()) { meta_file_stack.pop(); }
std::filesystem::remove(meta_tmp_path);
while (!data_file_stack.empty()) { data_file_stack.pop(); }
std::filesystem::remove(data_tmp_path);
std::cerr << "FS error: binary writer creation failed '" << _reg.get<ObjComp::Ephemeral::FilePath>(fid).path << "'\n";
return false;
}
{
const std::vector<uint8_t> meta_data = nlohmann::json::to_msgpack(meta_data_j);
if (!binary_writer_stack.top()->write(ByteSpan{meta_data})) {
// i feel like exceptions or refactoring would be nice here
while (!meta_file_stack.empty()) { meta_file_stack.pop(); }
std::filesystem::remove(meta_tmp_path);
while (!data_file_stack.empty()) { data_file_stack.pop(); }
std::filesystem::remove(data_tmp_path);
std::cerr << "FS error: binary writer failed '" << _reg.get<ObjComp::Ephemeral::FilePath>(fid).path << "'\n";
return false;
}
}
}
//// the meta file is itself msgpack data
nlohmann::json meta_header_j = nlohmann::json::array();
meta_header_j.emplace_back() = "SOLMET";
meta_header_j.push_back(meta_enc);
meta_header_j.push_back(meta_comp);
// with a custom msgpack impl like cmp, we can be smarter here and dont need an extra buffer
meta_header_j.push_back(nlohmann::json::binary(binary_meta_data));
const auto meta_header_data = nlohmann::json::to_msgpack(meta_header_j);
meta_file_stack.top()->write({meta_header_data.data(), meta_header_data.size()});
} else if (meta_type == MetaFileType::TEXT_JSON) {
// cant be compressed or encrypted
const auto meta_file_json_str = meta_data_j.dump(2, ' ', true);
meta_file_stack.top()->write({reinterpret_cast<const uint8_t*>(meta_file_json_str.data()), meta_file_json_str.size()});
}
} catch (...) {
while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
std::filesystem::remove(meta_tmp_path);
while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
std::filesystem::remove(data_tmp_path);
std::cerr << "FS error: failed to serialize json data\n";
return false;
}
// now data
// for zstd compression, chunk size is frame size. (no cross frame referencing)
// TODO: add buffering steam layer
static constexpr int64_t chunk_size{1024*1024}; // 1MiB should be enough
std::vector<uint8_t> buffer(chunk_size);
uint64_t buffer_actual_size {0};
do {
buffer_actual_size = data_cb(buffer.data(), buffer.size());
if (buffer_actual_size == 0) {
break;
}
if (buffer_actual_size > buffer.size()) {
// wtf
break;
}
data_file_stack.top()->write({buffer.data(), buffer_actual_size});
} while (buffer_actual_size == buffer.size());
//meta_file.flush();
//meta_file.close();
while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
//data_file.flush();
//data_file.close();
while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
std::filesystem::rename(
meta_tmp_path,
_reg.get<ObjComp::Ephemeral::FilePath>(fid).path + ".meta" + metaFileTypeSuffix(meta_type)
);
std::filesystem::rename(
data_tmp_path,
_reg.get<ObjComp::Ephemeral::FilePath>(fid).path
);
// TODO: check return value of renames
if (_reg.all_of<ObjComp::Ephemeral::DirtyTag>(fid)) {
_reg.remove<ObjComp::Ephemeral::DirtyTag>(fid);
}
return true;
}
bool FragmentStore::syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size) {
std::function<FragmentStore::write_to_storage_fetch_data_cb> fn_cb = [read = 0ull, data, data_size](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t {
uint64_t i = 0;
for (; i+read < data_size && i < buffer_size; i++) {
request_buffer[i] = data[i+read];
}
read += i;
return i;
};
return syncToStorage(fid, fn_cb);
}
bool FragmentStore::loadFromStorage(FragmentID fid, std::function<read_from_storage_put_data_cb>& data_cb) {
if (!_reg.valid(fid)) {
return false;
}
if (!_reg.all_of<ObjComp::Ephemeral::FilePath>(fid)) {
// not a file fragment?
// TODO: memory fragments
return false;
}
const auto& frag_path = _reg.get<ObjComp::Ephemeral::FilePath>(fid).path;
// TODO: check if metadata dirty?
// TODO: what if file changed on disk?
std::cout << "FS: loading fragment '" << frag_path << "'\n";
Compression data_comp = Compression::NONE;
if (_reg.all_of<FragComp::DataCompressionType>(fid)) {
data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp;
}
auto data_file_stack = buildFileStackRead(std::string_view{frag_path}, Encryption::NONE, data_comp);
if (data_file_stack.empty()) {
return false;
}
// TODO: make it read in a single chunk instead?
static constexpr int64_t chunk_size {1024 * 1024}; // 1MiB should be good for read
do {
auto data_var = data_file_stack.top()->read(chunk_size);
ByteSpan data = spanFromRead(data_var);
if (data.empty()) {
// error or probably eof
break;
}
data_cb(data);
if (data.size < chunk_size) {
// eof
break;
}
} while (data_file_stack.top()->isGood());
return true;
}
nlohmann::json FragmentStore::loadFromStorageNJ(FragmentID fid) {
std::vector<uint8_t> tmp_buffer;
std::function<read_from_storage_put_data_cb> cb = [&tmp_buffer](const ByteSpan buffer) {
tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend());
};
if (!loadFromStorage(fid, cb)) {
return nullptr;
}
return nlohmann::json::parse(tmp_buffer);
}
size_t FragmentStore::scanStoragePath(std::string_view path) {
if (path.empty()) {
path = _default_store_path;
}
// TODO: extract so async can work (or/and make iteratable generator)
if (!std::filesystem::is_directory(path)) {
std::cerr << "FS error: scan path not a directory '" << path << "'\n";
return 0;
}
// step 1: make snapshot of files, validate metafiles and save id/path+meta.ext
// can be extra thread (if non vfs)
struct FragFileEntry {
std::string id_str;
std::filesystem::path frag_path;
std::string meta_ext;
bool operator==(const FragFileEntry& other) const {
// only compare by id
return id_str == other.id_str;
}
};
struct FragFileEntryHash {
size_t operator()(const FragFileEntry& it) const {
return entt::hashed_string(it.id_str.data(), it.id_str.size());
}
};
entt::dense_set<FragFileEntry, FragFileEntryHash> file_frag_list;
std::filesystem::path storage_path{path};
auto handle_file = [&](const std::filesystem::path& file_path) {
if (!std::filesystem::is_regular_file(file_path)) {
return;
}
// handle file
if (file_path.has_extension()) {
// skip over metadata, assuming only metafiles have extentions (might be wrong?)
// also skips temps
return;
}
auto relative_path = std::filesystem::proximate(file_path, storage_path);
std::string id_str = relative_path.generic_u8string();
// delete all '/'
id_str.erase(std::remove(id_str.begin(), id_str.end(), '/'), id_str.end());
if (id_str.size() % 2 != 0) {
std::cerr << "FS error: non hex fragment uid detected: '" << id_str << "'\n";
}
if (file_frag_list.contains(FragFileEntry{id_str, {}, ""})) {
std::cerr << "FS error: fragment duplicate detected: '" << id_str << "'\n";
return; // skip
}
const char* meta_ext = ".meta.msgpack";
{ // find meta
// TODO: this as to know all possible extentions
bool has_meta_msgpack = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.msgpack");
bool has_meta_json = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.json");
const size_t meta_sum =
(has_meta_msgpack?1:0) +
(has_meta_json?1:0)
;
if (meta_sum > 1) { // has multiple
std::cerr << "FS error: fragment with multiple meta files detected: " << id_str << "\n";
return; // skip
}
if (meta_sum == 0) {
std::cerr << "FS error: fragment missing meta file detected: " << id_str << "\n";
return; // skip
}
if (has_meta_json) {
meta_ext = ".meta.json";
}
}
file_frag_list.emplace(FragFileEntry{
std::move(id_str),
file_path,
meta_ext
});
};
for (const auto& outer_path : std::filesystem::directory_iterator(storage_path)) {
if (std::filesystem::is_regular_file(outer_path)) {
handle_file(outer_path);
} else if (std::filesystem::is_directory(outer_path)) {
// subdir, part of id
for (const auto& inner_path : std::filesystem::directory_iterator(outer_path)) {
//if (std::filesystem::is_regular_file(inner_path)) {
//// handle file
//} // TODO: support deeper recursion?
handle_file(inner_path);
}
}
}
std::cout << "FS: scan found:\n";
for (const auto& it : file_frag_list) {
std::cout << " " << it.id_str << "\n";
}
// step 2: check if files preexist in reg
// main thread
// (merge into step 3 and do more error checking?)
for (auto it = file_frag_list.begin(); it != file_frag_list.end();) {
auto id = hex2bin(it->id_str);
auto fid = getFragmentByID(id);
if (_reg.valid(fid)) {
// pre exising (handle differently??)
// check if store differs?
it = file_frag_list.erase(it);
} else {
it++;
}
}
std::vector<FragmentID> scanned_frags;
// step 3: parse meta and insert into reg of non preexising
// main thread
// TODO: check timestamps of preexisting and reload? mark external/remote dirty?
for (const auto& it : file_frag_list) {
nlohmann::json j;
if (it.meta_ext == ".meta.msgpack") {
std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary);
if (!file.is_open()) {
std::cout << "FS error: failed opening meta " << it.frag_path << "\n";
continue;
}
// file is a msgpack within a msgpack
std::vector<uint8_t> full_meta_data;
{ // read meta file
// figure out size
file.seekg(0, file.end);
uint64_t file_size = file.tellg();
file.seekg(0, file.beg);
full_meta_data.resize(file_size);
file.read(reinterpret_cast<char*>(full_meta_data.data()), full_meta_data.size());
}
const auto meta_header_j = nlohmann::json::from_msgpack(full_meta_data, true, false);
if (!meta_header_j.is_array() || meta_header_j.size() < 4) {
std::cerr << "FS error: broken binary meta " << it.frag_path << "\n";
continue;
}
if (meta_header_j.at(0) != "SOLMET") {
std::cerr << "FS error: wrong magic '" << meta_header_j.at(0) << "' in meta " << it.frag_path << "\n";
continue;
}
Encryption meta_enc = meta_header_j.at(1);
if (meta_enc != Encryption::NONE) {
std::cerr << "FS error: unknown encryption " << it.frag_path << "\n";
continue;
}
Compression meta_comp = meta_header_j.at(2);
if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) {
std::cerr << "FS error: unknown compression " << it.frag_path << "\n";
continue;
}
//const auto& meta_data_ref = meta_header_j.at(3).is_binary()?meta_header_j.at(3):meta_header_j.at(3).at("data");
if (!meta_header_j.at(3).is_binary()) {
std::cerr << "FS error: meta data not binary " << it.frag_path << "\n";
continue;
}
const nlohmann::json::binary_t& meta_data_ref = meta_header_j.at(3);
std::stack<std::unique_ptr<File2I>> binary_reader_stack;
binary_reader_stack.push(std::make_unique<File2MemR>(ByteSpan{meta_data_ref.data(), meta_data_ref.size()}));
if (!buildStackRead(binary_reader_stack, meta_enc, meta_comp)) {
std::cerr << "FS error: binary reader creation failed " << it.frag_path << "\n";
continue;
}
// HACK: read fixed amout of data, but this way if we have neither enc nor comp we pass the span through
auto binary_read_value = binary_reader_stack.top()->read(10*1024*1024); // is 10MiB large enough for meta?
const auto binary_read_span = spanFromRead(binary_read_value);
assert(binary_read_span.size < 10*1024*1024);
j = nlohmann::json::from_msgpack(binary_read_span, true, false);
} else if (it.meta_ext == ".meta.json") {
std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary);
if (!file.is_open()) {
std::cerr << "FS error: failed opening meta " << it.frag_path << "\n";
continue;
}
file >> j;
} else {
assert(false);
}
if (!j.is_object()) {
std::cerr << "FS error: json in meta is broken " << it.id_str << "\n";
continue;
}
// TODO: existing fragment file
//newFragmentFile();
FragmentHandle fh{_reg, _reg.create()};
fh.emplace<FragComp::ID>(hex2bin(it.id_str));
fh.emplace<ObjComp::Ephemeral::FilePath>(it.frag_path.generic_u8string());
for (const auto& [k, v] : j.items()) {
// type id from string hash
const auto type_id = entt::hashed_string(k.data(), k.size());
const auto deserl_fn_it = _sc._deserl_json.find(type_id);
if (deserl_fn_it != _sc._deserl_json.cend()) {
// TODO: check return value
deserl_fn_it->second(fh, v);
} else {
std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n";
}
}
scanned_frags.push_back(fh);
}
// TODO: mutex and move code to async and return this list ?
// throw new frag event here, after loading them all
for (const FragmentID fid : scanned_frags) {
throwEventConstruct(fid);
}
return scanned_frags.size();
}
void FragmentStore::scanStoragePathAsync(std::string path) {
// add path to queue
// HACK: if path is known/any fragment is in the path, this operation blocks (non async)
scanStoragePath(path); // TODO: make async and post result
}
static bool serl_json_data_enc_type(const FragmentHandle fh, nlohmann::json& out) {
out = static_cast<std::underlying_type_t<Encryption>>(
fh.get<FragComp::DataEncryptionType>().enc
);
return true;
}
static bool deserl_json_data_enc_type(FragmentHandle fh, const nlohmann::json& in) {
fh.emplace_or_replace<FragComp::DataEncryptionType>(
static_cast<Encryption>(
static_cast<std::underlying_type_t<Encryption>>(in)
)
);
return true;
}
static bool serl_json_data_comp_type(const FragmentHandle fh, nlohmann::json& out) {
out = static_cast<std::underlying_type_t<Compression>>(
fh.get<FragComp::DataCompressionType>().comp
);
return true;
}
static bool deserl_json_data_comp_type(FragmentHandle fh, const nlohmann::json& in) {
fh.emplace_or_replace<FragComp::DataCompressionType>(
static_cast<Compression>(
static_cast<std::underlying_type_t<Compression>>(in)
)
);
return true;
}
void FragmentStore::registerSerializers(void) {
_sc.registerSerializerJson<FragComp::DataEncryptionType>(serl_json_data_enc_type);
_sc.registerDeSerializerJson<FragComp::DataEncryptionType>(deserl_json_data_enc_type);
_sc.registerSerializerJson<FragComp::DataCompressionType>(serl_json_data_comp_type);
_sc.registerDeSerializerJson<FragComp::DataCompressionType>(deserl_json_data_comp_type);
}

View File

@ -1,98 +0,0 @@
#pragma once
#include <solanaceae/util/span.hpp>
#include "./fragment_store_i.hpp"
#include "./types.hpp"
#include "./meta_components.hpp"
#include "./serializer.hpp"
#include "./uuid_generator.hpp"
#include <entt/core/type_info.hpp>
#include <entt/entity/registry.hpp>
#include <nlohmann/json_fwd.hpp>
#include <vector>
#include <array>
#include <cstdint>
#include <random>
struct FragmentStore : public FragmentStoreI {
UUIDGenerator_128_128 _session_uuid_gen;
std::string _default_store_path;
uint64_t _memory_budget {10u*1024u*1024u};
uint64_t _memory_usage {0u};
SerializerCallbacks<FragmentID> _sc;
FragmentStore(void);
FragmentStore(std::array<uint8_t, 16> session_uuid_namespace);
// TODO: make the frags ref counted
// TODO: check for exising
std::vector<uint8_t> generateNewUID(void);
// ========== new fragment ==========
// memory backed owned
FragmentID newFragmentMemoryOwned(
const std::vector<uint8_t>& id,
size_t initial_size
);
// memory backed view (can only be added? not new?)
// file backed (rw...)
// needs to know which store path to put into
FragmentID newFragmentFile(
std::string_view store_path,
MetaFileType mft,
const std::vector<uint8_t>& id
);
// this variant generate a new, mostly unique, id for us
FragmentID newFragmentFile(
std::string_view store_path,
MetaFileType mft
);
// ========== add fragment ==========
// ========== get fragment ==========
FragmentID getFragmentByID(
const std::vector<uint8_t>& id
);
FragmentID getFragmentCustomMatcher(
std::function<bool(FragmentID)>& fn
);
// remove fragment?
// unload?
// ========== sync fragment to storage ==========
using write_to_storage_fetch_data_cb = uint64_t(uint8_t* request_buffer, uint64_t buffer_size);
// calls data_cb with a buffer to be filled in, cb returns actual count of data. if returned < max, its the last buffer.
bool syncToStorage(FragmentID fid, std::function<write_to_storage_fetch_data_cb>& data_cb);
bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size);
// ========== load fragment data from storage ==========
using read_from_storage_put_data_cb = void(const ByteSpan buffer);
bool loadFromStorage(FragmentID fid, std::function<read_from_storage_put_data_cb>& data_cb);
// convenience function
nlohmann::json loadFromStorageNJ(FragmentID fid);
// fragment discovery?
// returns number of new fragments
size_t scanStoragePath(std::string_view path);
void scanStoragePathAsync(std::string path);
private:
void registerSerializers(void); // internal comps
};

View File

@ -1,31 +0,0 @@
#include "./fragment_store_i.hpp"
#include <iostream>
FragmentRegistry& FragmentStoreI::registry(void) {
return _reg;
}
FragmentHandle FragmentStoreI::fragmentHandle(const FragmentID fid) {
return {_reg, fid};
}
void FragmentStoreI::throwEventConstruct(const FragmentID fid) {
std::cout << "FSI debug: event construct " << entt::to_integral(fid) << "\n";
dispatch(
FragmentStore_Event::fragment_construct,
Fragment::Events::FragmentConstruct{
FragmentHandle{_reg, fid}
}
);
}
void FragmentStoreI::throwEventUpdate(const FragmentID fid) {
std::cout << "FSI debug: event updated " << entt::to_integral(fid) << "\n";
dispatch(
FragmentStore_Event::fragment_updated,
Fragment::Events::FragmentUpdated{
FragmentHandle{_reg, fid}
}
);
}

View File

@ -1,63 +0,0 @@
#pragma once
#include <solanaceae/util/event_provider.hpp>
#include <entt/entity/registry.hpp>
#include <entt/entity/handle.hpp>
#include <cstdint>
// internal id
enum class FragmentID : uint32_t {};
using FragmentRegistry = entt::basic_registry<FragmentID>;
using FragmentHandle = entt::basic_handle<FragmentRegistry>;
namespace Fragment::Events {
struct FragmentConstruct {
const FragmentHandle e;
};
struct FragmentUpdated {
const FragmentHandle e;
};
//struct MessageDestory {
//const Message3Handle e;
//};
} // Fragment::Events
enum class FragmentStore_Event : uint32_t {
fragment_construct,
fragment_updated,
//message_destroy,
MAX
};
struct FragmentStoreEventI {
using enumType = FragmentStore_Event;
virtual ~FragmentStoreEventI(void) {}
virtual bool onEvent(const Fragment::Events::FragmentConstruct&) { return false; }
virtual bool onEvent(const Fragment::Events::FragmentUpdated&) { return false; }
//virtual bool onEvent(const Fragment::Events::MessageDestory&) { return false; }
};
using FragmentStoreEventProviderI = EventProviderI<FragmentStoreEventI>;
struct FragmentStoreI : public FragmentStoreEventProviderI {
static constexpr const char* version {"1"};
FragmentRegistry _reg;
virtual ~FragmentStoreI(void) {}
FragmentRegistry& registry(void);
FragmentHandle fragmentHandle(const FragmentID fid);
void throwEventConstruct(const FragmentID fid);
void throwEventUpdate(const FragmentID fid);
// TODO
//void throwEventDestroy();
};

View File

@ -1,5 +1,7 @@
#include "./message_fragment_store.hpp"
#include "./serializer_json.hpp"
#include "../json/message_components.hpp"
#include <solanaceae/util/utils.hpp>
@ -28,8 +30,8 @@ namespace Message::Components {
////std::vector<uint8_t> uid;
//FragmentID id;
//};
// only contains fragments with <1024 messages and <28h tsrage (or whatever)
entt::dense_set<FragmentID> fid_open;
// only contains fragments with <1024 messages and <2h tsrage (or whatever)
entt::dense_set<Object> fid_open;
};
// all message fragments of this contact
@ -40,30 +42,30 @@ namespace Message::Components {
size_t i_b;
size_t i_e;
};
entt::dense_map<FragmentID, InternalEntry> frags;
entt::dense_map<Object, InternalEntry> frags;
// add 2 sorted contact lists for both range begin and end
// TODO: adding and removing becomes expensive with enough frags, consider splitting or heap
std::vector<FragmentID> sorted_begin;
std::vector<FragmentID> sorted_end;
std::vector<Object> sorted_begin;
std::vector<Object> sorted_end;
// api
// return true if it was actually inserted
bool insert(FragmentHandle frag);
bool erase(FragmentID frag);
bool insert(ObjectHandle frag);
bool erase(Object frag);
// update? (just erase() + insert())
// uses range begin to go back in time
FragmentID prev(FragmentID frag) const;
Object prev(Object frag) const;
// uses range end to go forward in time
FragmentID next(FragmentID frag) const;
Object next(Object frag) const;
};
// all LOADED message fragments
// TODO: merge into ContactFragments (and pull in openfrags)
struct LoadedContactFragments final {
// kept up-to-date by events
entt::dense_set<FragmentID> frags;
entt::dense_set<Object> frags;
};
} // Message::Components
@ -84,6 +86,23 @@ namespace ObjectStore::Components {
}
} // ObjectStore::Component
static nlohmann::json loadFromStorageNJ(ObjectHandle oh) {
assert(oh.all_of<ObjComp::Ephemeral::Backend>());
auto* backend = oh.get<ObjComp::Ephemeral::Backend>().ptr;
assert(backend != nullptr);
std::vector<uint8_t> tmp_buffer;
std::function<StorageBackendI::read_from_storage_put_data_cb> cb = [&tmp_buffer](const ByteSpan buffer) {
tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend());
};
if (!backend->read(oh, cb)) {
std::cerr << "failed to read obj '" << bin2hex(oh.get<ObjComp::ID>().v) << "'\n";
return false;
}
return nlohmann::json::parse(tmp_buffer, nullptr, false);
}
void MessageFragmentStore::handleMessage(const Message3Handle& m) {
if (_fs_ignore_event) {
// message event because of us loading a fragment, ignore
@ -113,8 +132,8 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
}
// TODO: use fid, seving full fuid for every message consumes alot of memory (and heap frag)
if (!m.all_of<Message::Components::FID>()) {
std::cout << "MFS: new msg missing FID\n";
if (!m.all_of<Message::Components::Obj>()) {
std::cout << "MFS: new msg missing Object\n";
if (!m.registry()->ctx().contains<Message::Components::OpenFragments>()) {
m.registry()->ctx().emplace<Message::Components::OpenFragments>();
}
@ -125,11 +144,11 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// missing fuid
// find closesed non-sealed off fragment
FragmentID fragment_id{entt::null};
Object fragment_id{entt::null};
// first search for fragment where the ts falls into the range
for (const auto& fid : fid_open) {
auto fh = _fs.fragmentHandle(fid);
auto fh = _os.objectHandle(fid);
assert(static_cast<bool>(fh));
// assuming ts range exists
@ -143,9 +162,9 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
}
// if it did not fit into an existing fragment, we next look for fragments that could be extended
if (!_fs._reg.valid(fragment_id)) {
if (!_os._reg.valid(fragment_id)) {
for (const auto& fid : fid_open) {
auto fh = _fs.fragmentHandle(fid);
auto fh = _os.objectHandle(fid);
assert(static_cast<bool>(fh));
// assuming ts range exists
@ -195,11 +214,13 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
}
// if its still not found, we need a new fragment
if (!_fs._reg.valid(fragment_id)) {
const auto new_fid = _fs.newFragmentFile("test_message_store/", MetaFileType::BINARY_MSGPACK);
auto fh = _fs.fragmentHandle(new_fid);
if (!_os.registry().valid(fragment_id)) {
const auto new_uuid = _session_uuid_gen();
_fs_ignore_event = true;
auto fh = _sb.newObject(ByteSpan{new_uuid});
_fs_ignore_event = false;
if (!static_cast<bool>(fh)) {
std::cout << "MFS error: failed to create new fragment for message\n";
std::cout << "MFS error: failed to create new object for message\n";
return;
}
@ -238,17 +259,17 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
std::cout << "MFS: created new fragment " << bin2hex(fh.get<ObjComp::ID>().v) << "\n";
_fs_ignore_event = true;
_fs.throwEventConstruct(fh);
_os.throwEventConstruct(fh);
_fs_ignore_event = false;
}
// if this is still empty, something is very wrong and we exit here
if (!_fs._reg.valid(fragment_id)) {
if (!_os.registry().valid(fragment_id)) {
std::cout << "MFS error: failed to find/create fragment for message\n";
return;
}
m.emplace_or_replace<Message::Components::FID>(fragment_id);
m.emplace_or_replace<Message::Components::Obj>(fragment_id);
// in this case we know the fragment needs an update
for (const auto& it : _fuid_save_queue) {
@ -257,11 +278,11 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
return; // done
}
}
_fuid_save_queue.push_back({Message::getTimeMS(), fragment_id, m.registry()});
_fuid_save_queue.push_back({Message::getTimeMS(), {_os.registry(), fragment_id}, m.registry()});
return; // done
}
const auto msg_fh = _fs.fragmentHandle(m.get<Message::Components::FID>().fid);
const auto msg_fh = _os.objectHandle(m.get<Message::Components::Obj>().o);
if (!static_cast<bool>(msg_fh)) {
std::cerr << "MFS error: fid in message is invalid\n";
return; // TODO: properly handle this case
@ -290,9 +311,9 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// assumes not loaded frag
// need update from frag
void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh) {
void MessageFragmentStore::loadFragment(Message3Registry& reg, ObjectHandle fh) {
std::cout << "MFS: loadFragment\n";
const auto j = _fs.loadFromStorageNJ(fh);
const auto j = loadFromStorageNJ(fh);
if (!j.is_array()) {
// wrong data
@ -339,7 +360,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh
}
}
new_real_msg.emplace_or_replace<Message::Components::FID>(fh);
new_real_msg.emplace_or_replace<Message::Components::Obj>(fh);
// dup check (hacky, specific to protocols)
Message3 dup_msg {entt::null};
@ -393,7 +414,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh
}
}
bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry& reg) {
bool MessageFragmentStore::syncFragToStorage(ObjectHandle fh, Message3Registry& reg) {
auto& ftsrange = fh.get_or_emplace<ObjComp::MessagesTSRange>(Message::getTimeMS(), Message::getTimeMS());
auto j = nlohmann::json::array();
@ -404,7 +425,7 @@ bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry
for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) {
const Message3 m = *it;
if (!reg.all_of<Message::Components::FID, Message::Components::ContactFrom, Message::Components::ContactTo>(m)) {
if (!reg.all_of<Message::Components::Obj, Message::Components::ContactFrom, Message::Components::ContactTo>(m)) {
continue;
}
@ -413,7 +434,7 @@ bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry
continue;
}
if (_fuid_save_queue.front().id != reg.get<Message::Components::FID>(m).fid) {
if (_fuid_save_queue.front().id != reg.get<Message::Components::Obj>(m).o) {
continue; // not ours
}
@ -452,10 +473,13 @@ bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry
// if save as binary
//nlohmann::json::to_msgpack(j);
auto j_dump = j.dump(2, ' ', true);
if (_fs.syncToStorage(fh, reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size())) {
assert(fh.all_of<ObjComp::Ephemeral::Backend>());
auto* backend = fh.get<ObjComp::Ephemeral::Backend>().ptr;
//if (_os.syncToStorage(fh, reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size())) {
if (backend->write(fh, {reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size()})) {
// TODO: make this better, should this be called on fail? should this be called before sync? (prob not)
_fs_ignore_event = true;
_fs.throwEventUpdate(fh);
_os.throwEventUpdate(fh);
_fs_ignore_event = false;
//std::cout << "MFS: dumped " << j_dump << "\n";
@ -470,30 +494,32 @@ bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry
MessageFragmentStore::MessageFragmentStore(
Contact3Registry& cr,
RegistryMessageModel& rmm,
FragmentStore& fs
) : _cr(cr), _rmm(rmm), _fs(fs), _sc{_cr, {}, {}} {
ObjectStore2& os,
StorageBackendI& sb
) : _cr(cr), _rmm(rmm), _os(os), _sb(sb), _sc{_cr, {}, {}} {
_rmm.subscribe(this, RegistryMessageModel_Event::message_construct);
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
_fs._sc.registerSerializerJson<ObjComp::MessagesTSRange>();
_fs._sc.registerDeSerializerJson<ObjComp::MessagesTSRange>();
_fs._sc.registerSerializerJson<ObjComp::MessagesContact>();
_fs._sc.registerDeSerializerJson<ObjComp::MessagesContact>();
auto& sjc = _os.registry().ctx().get<SerializerJsonCallbacks<Object>>();
sjc.registerSerializer<ObjComp::MessagesTSRange>();
sjc.registerDeSerializer<ObjComp::MessagesTSRange>();
sjc.registerSerializer<ObjComp::MessagesContact>();
sjc.registerDeSerializer<ObjComp::MessagesContact>();
// old
_fs._sc.registerSerializerJson<FragComp::MessagesTSRange>(_fs._sc.component_get_json<ObjComp::MessagesTSRange>);
_fs._sc.registerDeSerializerJson<FragComp::MessagesTSRange>(_fs._sc.component_emplace_or_replace_json<ObjComp::MessagesTSRange>);
_fs._sc.registerSerializerJson<FragComp::MessagesContact>(_fs._sc.component_get_json<ObjComp::MessagesContact>);
_fs._sc.registerDeSerializerJson<FragComp::MessagesContact>(_fs._sc.component_emplace_or_replace_json<ObjComp::MessagesContact>);
sjc.registerSerializer<FragComp::MessagesTSRange>(sjc.component_get_json<ObjComp::MessagesTSRange>);
sjc.registerDeSerializer<FragComp::MessagesTSRange>(sjc.component_emplace_or_replace_json<ObjComp::MessagesTSRange>);
sjc.registerSerializer<FragComp::MessagesContact>(sjc.component_get_json<ObjComp::MessagesContact>);
sjc.registerDeSerializer<FragComp::MessagesContact>(sjc.component_emplace_or_replace_json<ObjComp::MessagesContact>);
_fs.subscribe(this, FragmentStore_Event::fragment_construct);
_fs.subscribe(this, FragmentStore_Event::fragment_updated);
_os.subscribe(this, ObjectStore_Event::object_construct);
_os.subscribe(this, ObjectStore_Event::object_update);
}
MessageFragmentStore::~MessageFragmentStore(void) {
while (!_fuid_save_queue.empty()) {
auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id);
auto fh = _fuid_save_queue.front().id;
auto* reg = _fuid_save_queue.front().reg;
assert(reg != nullptr);
syncFragToStorage(fh, *reg);
@ -570,7 +596,7 @@ float MessageFragmentStore::tick(float) {
if (!_fuid_save_queue.empty()) {
// wait 10sec before saving
if (_fuid_save_queue.front().ts_since_dirty + 10*1000 <= ts_now) {
auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id);
auto fh = _fuid_save_queue.front().id;
auto* reg = _fuid_save_queue.front().reg;
assert(reg != nullptr);
if (syncFragToStorage(fh, *reg)) {
@ -587,7 +613,7 @@ float MessageFragmentStore::tick(float) {
const bool had_events = !_event_check_queue.empty();
for (size_t i = 0; i < 10 && !_event_check_queue.empty(); i++) {
std::cout << "MFS: event check\n";
auto fh = _fs.fragmentHandle(_event_check_queue.front().fid);
auto fh = _event_check_queue.front().fid;
auto c = _event_check_queue.front().c;
_event_check_queue.pop_front();
@ -643,7 +669,7 @@ float MessageFragmentStore::tick(float) {
continue;
}
auto fh = _fs.fragmentHandle(fid);
auto fh = _os.objectHandle(fid);
if (!static_cast<bool>(fh)) {
std::cerr << "MFS error: frag is invalid\n";
@ -696,25 +722,25 @@ float MessageFragmentStore::tick(float) {
cf.sorted_end.crbegin(),
cf.sorted_end.crend(),
ts_begin_comp.ts,
[&](const FragmentID element, const auto& value) -> bool {
return _fs._reg.get<ObjComp::MessagesTSRange>(element).end >= value;
[&](const Object element, const auto& value) -> bool {
return _os.registry().get<ObjComp::MessagesTSRange>(element).end >= value;
}
);
FragmentID next_frag{entt::null};
Object next_frag{entt::null};
if (right != cf.sorted_end.crend()) {
next_frag = cf.next(*right);
}
// we checked earlier that cf is not empty
if (!_fs._reg.valid(next_frag)) {
if (!_os.registry().valid(next_frag)) {
// fall back to closest, cf is not empty
next_frag = cf.sorted_end.front();
}
// a single adjacent frag is often not enough
// only ok bc next is cheap
for (size_t i = 0; i < 5 && _fs._reg.valid(next_frag); next_frag = cf.next(next_frag)) {
auto fh = _fs.fragmentHandle(next_frag);
for (size_t i = 0; i < 5 && _os.registry().valid(next_frag); next_frag = cf.next(next_frag)) {
auto fh = _os.objectHandle(next_frag);
if (fh.any_of<ObjComp::Ephemeral::MessagesEmptyTag>()) {
continue; // skip known empty
}
@ -744,25 +770,25 @@ float MessageFragmentStore::tick(float) {
cf.sorted_begin.cbegin(),
cf.sorted_begin.cend(),
ts_end,
[&](const FragmentID element, const auto& value) -> bool {
return _fs._reg.get<ObjComp::MessagesTSRange>(element).begin < value;
[&](const Object element, const auto& value) -> bool {
return _os.registry().get<ObjComp::MessagesTSRange>(element).begin < value;
}
);
FragmentID prev_frag{entt::null};
Object prev_frag{entt::null};
if (left != cf.sorted_begin.cend()) {
prev_frag = cf.prev(*left);
}
// we checked earlier that cf is not empty
if (!_fs._reg.valid(prev_frag)) {
if (!_os.registry().valid(prev_frag)) {
// fall back to closest, cf is not empty
prev_frag = cf.sorted_begin.back();
}
// a single adjacent frag is often not enough
// only ok bc next is cheap
for (size_t i = 0; i < 5 && _fs._reg.valid(prev_frag); prev_frag = cf.prev(prev_frag)) {
auto fh = _fs.fragmentHandle(prev_frag);
for (size_t i = 0; i < 5 && _os.registry().valid(prev_frag); prev_frag = cf.prev(prev_frag)) {
auto fh = _os.objectHandle(prev_frag);
if (fh.any_of<ObjComp::Ephemeral::MessagesEmptyTag>()) {
continue; // skip known empty
}
@ -791,10 +817,6 @@ float MessageFragmentStore::tick(float) {
return 1000.f*60.f*60.f;
}
void MessageFragmentStore::triggerScan(void) {
_fs.scanStoragePath("test_message_store/");
}
bool MessageFragmentStore::onEvent(const Message::Events::MessageConstruct& e) {
handleMessage(e.e);
return false;
@ -807,7 +829,7 @@ bool MessageFragmentStore::onEvent(const Message::Events::MessageUpdated& e) {
// TODO: handle deletes? diff between unload?
bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) {
bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectConstruct& e) {
if (_fs_ignore_event) {
return false; // skip self
}
@ -854,7 +876,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e)
return false;
}
bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) {
bool MessageFragmentStore::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
if (_fs_ignore_event) {
return false; // skip self
}
@ -911,7 +933,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) {
return false;
}
bool Message::Components::ContactFragments::insert(FragmentHandle frag) {
bool Message::Components::ContactFragments::insert(ObjectHandle frag) {
if (frags.contains(frag)) {
return false;
}
@ -926,7 +948,7 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) {
const auto pos = std::find_if(
sorted_begin.cbegin(),
sorted_begin.cend(),
[frag](const FragmentID a) -> bool {
[frag](const Object a) -> bool {
const auto begin_a = frag.registry()->get<ObjComp::MessagesTSRange>(a).begin;
const auto begin_frag = frag.get<ObjComp::MessagesTSRange>().begin;
if (begin_a > begin_frag) {
@ -951,7 +973,7 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) {
const auto pos = std::find_if_not(
sorted_end.cbegin(),
sorted_end.cend(),
[frag](const FragmentID a) -> bool {
[frag](const Object a) -> bool {
const auto end_a = frag.registry()->get<ObjComp::MessagesTSRange>(a).end;
const auto end_frag = frag.get<ObjComp::MessagesTSRange>().end;
if (end_a > end_frag) {
@ -984,7 +1006,7 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) {
return true;
}
bool Message::Components::ContactFragments::erase(FragmentID frag) {
bool Message::Components::ContactFragments::erase(Object frag) {
auto frags_it = frags.find(frag);
if (frags_it == frags.end()) {
return false;
@ -1001,7 +1023,7 @@ bool Message::Components::ContactFragments::erase(FragmentID frag) {
return true;
}
FragmentID Message::Components::ContactFragments::prev(FragmentID frag) const {
Object Message::Components::ContactFragments::prev(Object frag) const {
// uses range begin to go back in time
auto it = frags.find(frag);
@ -1017,7 +1039,7 @@ FragmentID Message::Components::ContactFragments::prev(FragmentID frag) const {
return entt::null;
}
FragmentID Message::Components::ContactFragments::next(FragmentID frag) const {
Object Message::Components::ContactFragments::next(Object frag) const {
// uses range end to go forward in time
auto it = frags.find(frag);

View File

@ -1,12 +1,12 @@
#pragma once
#include "./meta_components.hpp"
#include "./fragment_store_i.hpp"
#include "./fragment_store.hpp"
#include "./object_store.hpp"
#include "./uuid_generator.hpp"
#include "./message_serializer.hpp"
#include <entt/entity/registry.hpp>
#include <entt/container/dense_map.hpp>
#include <entt/container/dense_set.hpp>
@ -22,8 +22,8 @@ namespace Message::Components {
// unused, consumes too much memory (highly compressable)
//using FUID = FragComp::ID;
struct FID {
FragmentID fid {entt::null};
struct Obj {
Object o {entt::null};
};
// points to the front/newer message
@ -79,32 +79,35 @@ namespace Fragment::Components {
// on new message: assign fuid
// on new and update: mark as fragment dirty
// on delete: mark as fragment dirty?
class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentStoreEventI {
class MessageFragmentStore : public RegistryMessageModelEventI, public ObjectStoreEventI {
protected:
Contact3Registry& _cr;
RegistryMessageModel& _rmm;
FragmentStore& _fs;
ObjectStore2& _os;
StorageBackendI& _sb;
bool _fs_ignore_event {false};
UUIDGenerator_128_128 _session_uuid_gen;
// for message components only
MessageSerializerCallbacks _sc;
void handleMessage(const Message3Handle& m);
void loadFragment(Message3Registry& reg, FragmentHandle fh);
void loadFragment(Message3Registry& reg, ObjectHandle oh);
bool syncFragToStorage(FragmentHandle fh, Message3Registry& reg);
bool syncFragToStorage(ObjectHandle oh, Message3Registry& reg);
struct SaveQueueEntry final {
uint64_t ts_since_dirty{0};
//std::vector<uint8_t> id;
FragmentID id;
ObjectHandle id;
Message3Registry* reg{nullptr};
};
std::deque<SaveQueueEntry> _fuid_save_queue;
struct ECQueueEntry final {
FragmentID fid;
ObjectHandle fid;
Contact3 c;
};
std::deque<ECQueueEntry> _event_check_queue;
@ -118,7 +121,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS
MessageFragmentStore(
Contact3Registry& cr,
RegistryMessageModel& rmm,
FragmentStore& fs
ObjectStore2& os,
StorageBackendI& sb
);
virtual ~MessageFragmentStore(void);
@ -126,14 +130,12 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS
float tick(float time_delta);
void triggerScan(void);
protected: // rmm
bool onEvent(const Message::Events::MessageConstruct& e) override;
bool onEvent(const Message::Events::MessageUpdated& e) override;
protected: // fs
bool onEvent(const Fragment::Events::FragmentConstruct& e) override;
bool onEvent(const Fragment::Events::FragmentUpdated& e) override;
bool onEvent(const ObjectStore::Events::ObjectConstruct& e) override;
bool onEvent(const ObjectStore::Events::ObjectUpdate& e) override;
};

View File

@ -1,68 +0,0 @@
#pragma once
#include <entt/core/type_info.hpp>
#include <entt/container/dense_map.hpp>
#include <entt/entity/handle.hpp>
#include <nlohmann/json_fwd.hpp>
template<typename EntityType = entt::entity>
struct SerializerCallbacks {
using Registry = entt::basic_registry<EntityType>;
using Handle = entt::basic_handle<Registry>;
// nlohmann
// json/msgpack
using serialize_json_fn = bool(*)(const Handle h, nlohmann::json& out);
entt::dense_map<entt::id_type, serialize_json_fn> _serl_json;
using deserialize_json_fn = bool(*)(Handle h, const nlohmann::json& in);
entt::dense_map<entt::id_type, deserialize_json_fn> _deserl_json;
template<typename T>
static bool component_get_json(const Handle h, nlohmann::json& j) {
if (h.template all_of<T>()) {
if constexpr (!std::is_empty_v<T>) {
j = h.template get<T>();
}
return true;
}
return false;
}
template<typename T>
static bool component_emplace_or_replace_json(Handle h, const nlohmann::json& j) {
if constexpr (std::is_empty_v<T>) {
h.template emplace_or_replace<T>(); // assert empty json?
} else {
h.template emplace_or_replace<T>(static_cast<T>(j));
}
return true;
}
void registerSerializerJson(serialize_json_fn fn, const entt::type_info& type_info) {
_serl_json[type_info.hash()] = fn;
}
template<typename CompType>
void registerSerializerJson(
serialize_json_fn fn = component_get_json<CompType>,
const entt::type_info& type_info = entt::type_id<CompType>()
) {
registerSerializerJson(fn, type_info);
}
void registerDeSerializerJson(deserialize_json_fn fn, const entt::type_info& type_info) {
_deserl_json[type_info.hash()] = fn;
}
template<typename CompType>
void registerDeSerializerJson(
deserialize_json_fn fn = component_emplace_or_replace_json<CompType>,
const entt::type_info& type_info = entt::type_id<CompType>()
) {
registerDeSerializerJson(fn, type_info);
}
};

View File

@ -1,80 +0,0 @@
#include <cstdint>
#include <iostream>
#include "./fragment_store.hpp"
#include <nlohmann/json.hpp>
#include <entt/entity/handle.hpp>
namespace Components {
struct MessagesTimestampRange {
uint64_t begin {0};
uint64_t end {1000};
};
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesTimestampRange, begin, end)
} // Components
int main(void) {
FragmentStore fs;
fs._default_store_path = "test_store/";
fs._sc.registerSerializerJson<Components::MessagesTimestampRange>();
fs._sc.registerDeSerializerJson<Components::MessagesTimestampRange>();
const auto frag0 = fs.newFragmentFile("", MetaFileType::TEXT_JSON, {0xff, 0xf1, 0xf2, 0xf0, 0xff, 0xff, 0xff, 0xf9});
const auto frag1 = fs.newFragmentFile("", MetaFileType::BINARY_MSGPACK);
const auto frag2 = fs.newFragmentFile("", MetaFileType::BINARY_MSGPACK);
{
auto frag0h = fs.fragmentHandle(frag0);
frag0h.emplace_or_replace<FragComp::DataCompressionType>();
frag0h.emplace_or_replace<FragComp::DataEncryptionType>();
frag0h.emplace_or_replace<Components::MessagesTimestampRange>();
std::function<FragmentStore::write_to_storage_fetch_data_cb> fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t {
uint64_t i = 0;
for (; i+read < 3000 && i < buffer_size; i++) {
request_buffer[i] = uint8_t((i+read) & 0xff);
}
read += i;
return i;
};
fs.syncToStorage(frag0, fn_cb);
}
{
auto frag1h = fs.fragmentHandle(frag1);
frag1h.emplace_or_replace<FragComp::DataCompressionType>().comp = Compression::ZSTD;
frag1h.emplace_or_replace<FragComp::DataEncryptionType>();
std::function<FragmentStore::write_to_storage_fetch_data_cb> fn_cb = [read = 0ul](uint8_t* request_buffer, uint64_t buffer_size) mutable -> uint64_t {
static constexpr std::string_view text = "This is some random data";
uint64_t i = 0;
for (; i+read < text.size() && i < buffer_size; i++) {
request_buffer[i] = text[i+read];
}
read += i;
return i;
};
fs.syncToStorage(frag1, fn_cb);
}
{
auto frag2h = fs.fragmentHandle(frag2);
frag2h.emplace_or_replace<FragComp::DataCompressionType>();
frag2h.emplace_or_replace<FragComp::DataEncryptionType>();
static constexpr std::string_view text = "This is more random data";
fs.syncToStorage(frag2, reinterpret_cast<const uint8_t*>(text.data()), text.size());
}
return 0;
}

View File

@ -16,7 +16,8 @@ MainScreen::MainScreen(SDL_Renderer* renderer_, std::string save_path, std::stri
renderer(renderer_),
rmm(cr),
mts(rmm),
mfs(cr, rmm, fs),
mfsb(os, "test2_message_store/"),
mfs(cr, rmm, os, mfsb),
tc(save_path, save_password),
tpi(tc.getTox()),
ad(tc),
@ -55,9 +56,7 @@ MainScreen::MainScreen(SDL_Renderer* renderer_, std::string save_path, std::stri
std::cout << "own address: " << tc.toxSelfGetAddressStr() << "\n";
{ // setup plugin instances
// TODO: make interface useful
g_provideInstance<FragmentStoreI>("FragmentStoreI", "host", &fs);
g_provideInstance<FragmentStore>("FragmentStore", "host", &fs);
g_provideInstance<ObjectStore2>("ObjectStore2", "host", &os);
g_provideInstance<ConfigModelI>("ConfigModelI", "host", &conf);
g_provideInstance<Contact3Registry>("Contact3Registry", "1", "host", &cr);
@ -85,7 +84,7 @@ MainScreen::MainScreen(SDL_Renderer* renderer_, std::string save_path, std::stri
conf.dump();
mfs.triggerScan(); // HACK: after plugins and tox contacts got loaded
mfsb.scanAsync(); // HACK: after plugins and tox contacts got loaded
}
MainScreen::~MainScreen(void) {

View File

@ -2,7 +2,8 @@
#include "./screen.hpp"
#include "./fragment_store/fragment_store.hpp"
#include "./fragment_store/object_store.hpp"
#include "./fragment_store/backends/filesystem_storage.hpp"
#include <solanaceae/util/simple_config_model.hpp>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
@ -45,12 +46,13 @@ extern "C" {
struct MainScreen final : public Screen {
SDL_Renderer* renderer;
FragmentStore fs;
ObjectStore2 os;
SimpleConfigModel conf;
Contact3Registry cr;
RegistryMessageModel rmm;
MessageTimeSort mts;
backend::FilesystemStorage mfsb; // message fsb // TODO: make configurable
MessageFragmentStore mfs;
ToxEventLogger tel{std::cout};