more work on backend and moving frags to objs

This commit is contained in:
Green Sky 2024-04-09 19:36:35 +02:00
parent 0610a6a64a
commit 3cede91aa0
No known key found for this signature in database
12 changed files with 514 additions and 152 deletions

@ -80,6 +80,15 @@ target_link_libraries(test_fragment_store PUBLIC
########################################
add_executable(convert_frag_to_obj
fragment_store/convert_frag_to_obj.cpp
)
target_link_libraries(convert_frag_to_obj PUBLIC
fragment_store
)
########################################
add_executable(tomato
./main.cpp
./icon.rc

@ -2,6 +2,9 @@
#include "../meta_components.hpp"
#include <solanaceae/util/utils.hpp>
#include <entt/container/dense_set.hpp>
#include <nlohmann/json.hpp>
#include <solanaceae/file/file2_std.hpp>
@ -22,9 +25,65 @@ static const char* metaFileTypeSuffix(MetaFileType mft) {
return ""; // .unk?
}
// TODO: move to ... somewhere. (span? file2i?)
static ByteSpan spanFromRead(const std::variant<ByteSpan, std::vector<uint8_t>>& data_var) {
if (std::holds_alternative<std::vector<uint8_t>>(data_var)) {
auto& vec = std::get<std::vector<uint8_t>>(data_var);
return {vec.data(), vec.size()};
} else if (std::holds_alternative<ByteSpan>(data_var)) {
return std::get<ByteSpan>(data_var);
} else {
assert(false);
return {};
}
}
// TODO: move somewhere else
static bool serl_json_data_enc_type(const ObjectHandle oh, nlohmann::json& out) {
out = static_cast<std::underlying_type_t<Encryption>>(
oh.get<ObjComp::DataEncryptionType>().enc
);
return true;
}
static bool deserl_json_data_enc_type(ObjectHandle oh, const nlohmann::json& in) {
oh.emplace_or_replace<ObjComp::DataEncryptionType>(
static_cast<Encryption>(
static_cast<std::underlying_type_t<Encryption>>(in)
)
);
return true;
}
static bool serl_json_data_comp_type(const ObjectHandle oh, nlohmann::json& out) {
out = static_cast<std::underlying_type_t<Compression>>(
oh.get<ObjComp::DataCompressionType>().comp
);
return true;
}
static bool deserl_json_data_comp_type(ObjectHandle oh, const nlohmann::json& in) {
oh.emplace_or_replace<ObjComp::DataCompressionType>(
static_cast<Compression>(
static_cast<std::underlying_type_t<Compression>>(in)
)
);
return true;
}
namespace backend {
FilesystemStorage::FilesystemStorage(ObjectStore2& os) : StorageBackendI::StorageBackendI(os) {
FilesystemStorage::FilesystemStorage(ObjectStore2& os, std::string_view storage_path) : StorageBackendI::StorageBackendI(os), _storage_path(storage_path) {
_sc.registerSerializerJson<ObjComp::DataEncryptionType>(serl_json_data_enc_type);
_sc.registerDeSerializerJson<ObjComp::DataEncryptionType>(deserl_json_data_enc_type);
_sc.registerSerializerJson<ObjComp::DataCompressionType>(serl_json_data_comp_type);
_sc.registerDeSerializerJson<ObjComp::DataCompressionType>(deserl_json_data_comp_type);
// old stuff
_sc.registerSerializerJson<FragComp::DataEncryptionType>(serl_json_data_enc_type);
_sc.registerDeSerializerJson<FragComp::DataEncryptionType>(deserl_json_data_enc_type);
_sc.registerSerializerJson<FragComp::DataCompressionType>(serl_json_data_comp_type);
_sc.registerDeSerializerJson<FragComp::DataCompressionType>(deserl_json_data_comp_type);
}
FilesystemStorage::~FilesystemStorage(void) {
@ -39,38 +98,38 @@ bool FilesystemStorage::write(Object o, std::function<write_to_storage_fetch_dat
ObjectHandle oh {reg, o};
if (!oh.all_of<FragComp::Ephemeral::FilePath>()) {
if (!oh.all_of<ObjComp::Ephemeral::FilePath>()) {
// not a file fragment?
return false;
}
// split object storage
// split object storage (meta and data are 2 files)
MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults
if (reg.all_of<FragComp::Ephemeral::MetaFileType>(o)) {
meta_type = oh.get<FragComp::Ephemeral::MetaFileType>().type;
MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults?
if (reg.all_of<ObjComp::Ephemeral::MetaFileType>(o)) {
meta_type = oh.get<ObjComp::Ephemeral::MetaFileType>().type;
}
Encryption meta_enc = Encryption::NONE; // TODO: better defaults
Compression meta_comp = Compression::NONE; // TODO: better defaults
Encryption meta_enc = Encryption::NONE; // TODO: better defaults?
Compression meta_comp = Compression::NONE; // TODO: better defaults?
if (meta_type != MetaFileType::TEXT_JSON) {
if (oh.all_of<FragComp::Ephemeral::MetaEncryptionType>()) {
meta_enc = oh.get<FragComp::Ephemeral::MetaEncryptionType>().enc;
if (oh.all_of<ObjComp::Ephemeral::MetaEncryptionType>()) {
meta_enc = oh.get<ObjComp::Ephemeral::MetaEncryptionType>().enc;
}
if (oh.all_of<FragComp::Ephemeral::MetaCompressionType>()) {
meta_comp = oh.get<FragComp::Ephemeral::MetaCompressionType>().comp;
if (oh.all_of<ObjComp::Ephemeral::MetaCompressionType>()) {
meta_comp = oh.get<ObjComp::Ephemeral::MetaCompressionType>().comp;
}
} else {
// we cant have encryption or compression
// so we force NONE for TEXT JSON
oh.emplace_or_replace<FragComp::Ephemeral::MetaEncryptionType>(Encryption::NONE);
oh.emplace_or_replace<FragComp::Ephemeral::MetaCompressionType>(Compression::NONE);
oh.emplace_or_replace<ObjComp::Ephemeral::MetaEncryptionType>(Encryption::NONE);
oh.emplace_or_replace<ObjComp::Ephemeral::MetaCompressionType>(Compression::NONE);
}
std::filesystem::path meta_tmp_path = oh.get<FragComp::Ephemeral::FilePath>().path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp";
std::filesystem::path meta_tmp_path = oh.get<ObjComp::Ephemeral::FilePath>().path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp";
meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string());
// TODO: make meta comp work with mem compressor
//auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp);
@ -85,14 +144,14 @@ bool FilesystemStorage::write(Object o, std::function<write_to_storage_fetch_dat
Encryption data_enc = Encryption::NONE; // TODO: better defaults
Compression data_comp = Compression::NONE; // TODO: better defaults
if (oh.all_of<FragComp::DataEncryptionType>()) {
data_enc = oh.get<FragComp::DataEncryptionType>().enc;
if (oh.all_of<ObjComp::DataEncryptionType>()) {
data_enc = oh.get<ObjComp::DataEncryptionType>().enc;
}
if (oh.all_of<FragComp::DataCompressionType>()) {
data_comp = oh.get<FragComp::DataCompressionType>().comp;
if (oh.all_of<ObjComp::DataCompressionType>()) {
data_comp = oh.get<ObjComp::DataCompressionType>().comp;
}
std::filesystem::path data_tmp_path = oh.get<FragComp::Ephemeral::FilePath>().path + ".tmp";
std::filesystem::path data_tmp_path = oh.get<ObjComp::Ephemeral::FilePath>().path + ".tmp";
data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string());
auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp);
if (data_file_stack.empty()) {
@ -107,6 +166,7 @@ bool FilesystemStorage::write(Object o, std::function<write_to_storage_fetch_dat
nlohmann::json meta_data_j = nlohmann::json::object(); // metadata needs to be an object, null not allowed
// metadata file
// TODO: refactor extract to OS
for (const auto& [type_id, storage] : reg.storage()) {
if (!storage.contains(o)) {
continue;
@ -115,8 +175,8 @@ bool FilesystemStorage::write(Object o, std::function<write_to_storage_fetch_dat
//std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n";
// use type_id to find serializer
auto s_cb_it = _os._sc._serl_json.find(type_id);
if (s_cb_it == _os._sc._serl_json.end()) {
auto s_cb_it = _sc._serl_json.find(type_id);
if (s_cb_it == _sc._serl_json.end()) {
// could not find serializer, not saving
continue;
}
@ -140,7 +200,7 @@ bool FilesystemStorage::write(Object o, std::function<write_to_storage_fetch_dat
std::filesystem::remove(meta_tmp_path);
while (!data_file_stack.empty()) { data_file_stack.pop(); }
std::filesystem::remove(data_tmp_path);
std::cerr << "FS error: binary writer creation failed '" << oh.get<FragComp::Ephemeral::FilePath>().path << "'\n";
std::cerr << "FS error: binary writer creation failed '" << oh.get<ObjComp::Ephemeral::FilePath>().path << "'\n";
return false;
}
@ -152,7 +212,7 @@ bool FilesystemStorage::write(Object o, std::function<write_to_storage_fetch_dat
std::filesystem::remove(meta_tmp_path);
while (!data_file_stack.empty()) { data_file_stack.pop(); }
std::filesystem::remove(data_tmp_path);
std::cerr << "FS error: binary writer failed '" << oh.get<FragComp::Ephemeral::FilePath>().path << "'\n";
std::cerr << "FS error: binary writer failed '" << oh.get<ObjComp::Ephemeral::FilePath>().path << "'\n";
return false;
}
}
@ -203,27 +263,24 @@ bool FilesystemStorage::write(Object o, std::function<write_to_storage_fetch_dat
data_file_stack.top()->write({buffer.data(), buffer_actual_size});
} while (buffer_actual_size == buffer.size());
//meta_file.flush();
//meta_file.close();
// flush // TODO: use scope
while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
//data_file.flush();
//data_file.close();
while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
std::filesystem::rename(
meta_tmp_path,
oh.get<FragComp::Ephemeral::FilePath>().path + ".meta" + metaFileTypeSuffix(meta_type)
oh.get<ObjComp::Ephemeral::FilePath>().path + ".meta" + metaFileTypeSuffix(meta_type)
);
std::filesystem::rename(
data_tmp_path,
oh.get<FragComp::Ephemeral::FilePath>().path
oh.get<ObjComp::Ephemeral::FilePath>().path
);
// TODO: check return value of renames
if (oh.all_of<FragComp::Ephemeral::DirtyTag>()) {
oh.remove<FragComp::Ephemeral::DirtyTag>();
if (oh.all_of<ObjComp::Ephemeral::DirtyTag>()) {
oh.remove<ObjComp::Ephemeral::DirtyTag>();
}
return true;
@ -233,5 +290,261 @@ bool FilesystemStorage::read(Object o, std::function<read_from_storage_put_data_
return false;
}
void FilesystemStorage::scanAsync(void) {
scanPathAsync(_storage_path);
}
size_t FilesystemStorage::scanPath(std::string_view path) {
// TODO: extract so async can work (or/and make iteratable generator)
if (path.empty() || !std::filesystem::is_directory(path)) {
std::cerr << "FS error: scan path not a directory '" << path << "'\n";
return 0;
}
// step 1: make snapshot of files, validate metafiles and save id/path+meta.ext
// can be extra thread (if non vfs)
struct ObjFileEntry {
std::string id_str;
std::filesystem::path obj_path;
std::string meta_ext;
bool operator==(const ObjFileEntry& other) const {
// only compare by id
return id_str == other.id_str;
}
};
struct ObjFileEntryHash {
size_t operator()(const ObjFileEntry& it) const {
return entt::hashed_string(it.id_str.data(), it.id_str.size());
}
};
entt::dense_set<ObjFileEntry, ObjFileEntryHash> file_obj_list;
std::filesystem::path storage_path{path};
auto handle_file = [&](const std::filesystem::path& file_path) {
if (!std::filesystem::is_regular_file(file_path)) {
return;
}
// handle file
if (file_path.has_extension()) {
// skip over metadata, assuming only metafiles have extentions (might be wrong?)
// also skips temps
return;
}
auto relative_path = std::filesystem::proximate(file_path, storage_path);
std::string id_str = relative_path.generic_u8string();
// delete all '/'
id_str.erase(std::remove(id_str.begin(), id_str.end(), '/'), id_str.end());
if (id_str.size() % 2 != 0) {
std::cerr << "FS error: non hex object uid detected: '" << id_str << "'\n";
}
if (file_obj_list.contains(ObjFileEntry{id_str, {}, ""})) {
std::cerr << "FS error: object duplicate detected: '" << id_str << "'\n";
return; // skip
}
const char* meta_ext = ".meta.msgpack";
{ // find meta
// TODO: this as to know all possible extentions
bool has_meta_msgpack = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.msgpack");
bool has_meta_json = std::filesystem::is_regular_file(file_path.generic_u8string() + ".meta.json");
const size_t meta_sum =
(has_meta_msgpack?1:0) +
(has_meta_json?1:0)
;
if (meta_sum > 1) { // has multiple
std::cerr << "FS error: object with multiple meta files detected: " << id_str << "\n";
return; // skip
}
if (meta_sum == 0) {
std::cerr << "FS error: object missing meta file detected: " << id_str << "\n";
return; // skip
}
if (has_meta_json) {
meta_ext = ".meta.json";
}
}
file_obj_list.emplace(ObjFileEntry{
std::move(id_str),
file_path,
meta_ext
});
};
for (const auto& outer_path : std::filesystem::directory_iterator(storage_path)) {
if (std::filesystem::is_regular_file(outer_path)) {
handle_file(outer_path);
} else if (std::filesystem::is_directory(outer_path)) {
// subdir, part of id
for (const auto& inner_path : std::filesystem::directory_iterator(outer_path)) {
//if (std::filesystem::is_regular_file(inner_path)) {
//// handle file
//} // TODO: support deeper recursion?
handle_file(inner_path);
}
}
}
std::cout << "FS: scan found:\n";
for (const auto& it : file_obj_list) {
std::cout << " " << it.id_str << "\n";
}
// step 2: check if files preexist in reg
// main thread
// (merge into step 3 and do more error checking?)
// TODO: properly handle duplicates, dups form different backends might be legit
// important
#if 0
for (auto it = file_obj_list.begin(); it != file_obj_list.end();) {
auto id = hex2bin(it->id_str);
auto fid = getFragmentByID(id);
if (_reg.valid(fid)) {
// pre exising (handle differently??)
// check if store differs?
it = file_obj_list.erase(it);
} else {
it++;
}
}
#endif
std::vector<Object> scanned_objs;
// step 3: parse meta and insert into reg of non preexising
// main thread
// TODO: check timestamps of preexisting and reload? mark external/remote dirty?
for (const auto& it : file_obj_list) {
nlohmann::json j;
if (it.meta_ext == ".meta.msgpack") {
std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary);
if (!file.is_open()) {
std::cout << "FS error: failed opening meta " << it.obj_path << "\n";
continue;
}
// file is a msgpack within a msgpack
std::vector<uint8_t> full_meta_data;
{ // read meta file
// figure out size
file.seekg(0, file.end);
uint64_t file_size = file.tellg();
file.seekg(0, file.beg);
full_meta_data.resize(file_size);
file.read(reinterpret_cast<char*>(full_meta_data.data()), full_meta_data.size());
}
const auto meta_header_j = nlohmann::json::from_msgpack(full_meta_data, true, false);
if (!meta_header_j.is_array() || meta_header_j.size() < 4) {
std::cerr << "FS error: broken binary meta " << it.obj_path << "\n";
continue;
}
if (meta_header_j.at(0) != "SOLMET") {
std::cerr << "FS error: wrong magic '" << meta_header_j.at(0) << "' in meta " << it.obj_path << "\n";
continue;
}
Encryption meta_enc = meta_header_j.at(1);
if (meta_enc != Encryption::NONE) {
std::cerr << "FS error: unknown encryption " << it.obj_path << "\n";
continue;
}
Compression meta_comp = meta_header_j.at(2);
if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) {
std::cerr << "FS error: unknown compression " << it.obj_path << "\n";
continue;
}
//const auto& meta_data_ref = meta_header_j.at(3).is_binary()?meta_header_j.at(3):meta_header_j.at(3).at("data");
if (!meta_header_j.at(3).is_binary()) {
std::cerr << "FS error: meta data not binary " << it.obj_path << "\n";
continue;
}
const nlohmann::json::binary_t& meta_data_ref = meta_header_j.at(3);
std::stack<std::unique_ptr<File2I>> binary_reader_stack;
binary_reader_stack.push(std::make_unique<File2MemR>(ByteSpan{meta_data_ref.data(), meta_data_ref.size()}));
if (!buildStackRead(binary_reader_stack, meta_enc, meta_comp)) {
std::cerr << "FS error: binary reader creation failed " << it.obj_path << "\n";
continue;
}
// HACK: read fixed amout of data, but this way if we have neither enc nor comp we pass the span through
auto binary_read_value = binary_reader_stack.top()->read(10*1024*1024); // is 10MiB large enough for meta?
const auto binary_read_span = spanFromRead(binary_read_value);
assert(binary_read_span.size < 10*1024*1024);
j = nlohmann::json::from_msgpack(binary_read_span, true, false);
} else if (it.meta_ext == ".meta.json") {
std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary);
if (!file.is_open()) {
std::cerr << "FS error: failed opening meta " << it.obj_path << "\n";
continue;
}
file >> j;
} else {
assert(false);
}
if (!j.is_object()) {
std::cerr << "FS error: json in meta is broken " << it.id_str << "\n";
continue;
}
// TODO: existing fragment file
//newFragmentFile();
ObjectHandle oh{_os.registry(), _os.registry().create()};
oh.emplace<ObjComp::Ephemeral::Backend>(this);
oh.emplace<ObjComp::ID>(hex2bin(it.id_str));
oh.emplace<ObjComp::Ephemeral::FilePath>(it.obj_path.generic_u8string());
for (const auto& [k, v] : j.items()) {
// type id from string hash
const auto type_id = entt::hashed_string(k.data(), k.size());
const auto deserl_fn_it = _sc._deserl_json.find(type_id);
if (deserl_fn_it != _sc._deserl_json.cend()) {
// TODO: check return value
deserl_fn_it->second(oh, v);
} else {
std::cerr << "FS warning: missing deserializer for meta key '" << k << "'\n";
}
}
scanned_objs.push_back(oh);
}
// TODO: mutex and move code to async and return this list ?
// throw new frag event here, after loading them all
for (const Object o : scanned_objs) {
_os.throwEventConstruct(o);
}
return scanned_objs.size();
}
void FilesystemStorage::scanPathAsync(std::string path) {
// add path to queue
// HACK: if path is known/any fragment is in the path, this operation blocks (non async)
scanPath(path); // TODO: make async and post result
}
} // backend

@ -5,14 +5,28 @@
namespace backend {
struct FilesystemStorage : public StorageBackendI {
FilesystemStorage(ObjectStore2& os);
FilesystemStorage(ObjectStore2& os, std::string_view storage_path = "test_obj_store");
~FilesystemStorage(void);
// TODO: fix the path for this specific fs?
std::string _storage_path;
bool write(Object o, std::function<write_to_storage_fetch_data_cb>& data_cb) override;
bool read(Object o, std::function<read_from_storage_put_data_cb>& data_cb) override;
//// convenience function
//nlohmann::json loadFromStorageNJ(FragmentID fid);
void scanAsync(void);
private:
size_t scanPath(std::string_view path);
void scanPathAsync(std::string path);
private:
// this thing needs to change and be facilitated over the OS
// but the json serializer are specific to the backend
SerializerCallbacks<Object> _sc;
};
} // backend

@ -0,0 +1,36 @@
#include "./object_store.hpp"
#include "./backends/filesystem_storage.hpp"
#include <filesystem>
#include <iostream>
int main(int argc, const char** argv) {
if (argc != 3) {
std::cerr << "wrong paramter count, do " << argv[0] << " <input_folder> <output_folder>\n";
return 1;
}
if (!std::filesystem::is_directory(argv[1])) {
std::cerr << "input folder is no folder\n";
}
std::filesystem::create_directories(argv[2]);
// we are going to use 2 different OS for convineance, but could be done with 1 too
ObjectStore2 os_src;
ObjectStore2 os_dst;
backend::FilesystemStorage fsb_src(os_src, argv[1]);
backend::FilesystemStorage fsb_dst(os_dst, argv[2]);
// add message fragment store too (adds meta?)
// hookup events
// perform scan (which triggers events)
fsb_dst.scanAsync(); // fill with existing?
fsb_src.scanAsync(); // the scan
// done
return 0;
}

@ -139,9 +139,9 @@ FragmentID FragmentStore::newFragmentFile(
_reg.emplace<FragComp::ID>(new_frag, id);
// file (info) comp
_reg.emplace<FragComp::Ephemeral::FilePath>(new_frag, fragment_file_path.generic_u8string());
_reg.emplace<ObjComp::Ephemeral::FilePath>(new_frag, fragment_file_path.generic_u8string());
_reg.emplace<FragComp::Ephemeral::MetaFileType>(new_frag, mft);
_reg.emplace<ObjComp::Ephemeral::MetaFileType>(new_frag, mft);
// meta needs to be synced to file
std::function<write_to_storage_fetch_data_cb> empty_data_cb = [](auto*, auto) -> uint64_t { return 0; };
@ -188,7 +188,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
return false;
}
if (!_reg.all_of<FragComp::Ephemeral::FilePath>(fid)) {
if (!_reg.all_of<ObjComp::Ephemeral::FilePath>(fid)) {
// not a file fragment?
return false;
}
@ -196,30 +196,30 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
// split object storage
MetaFileType meta_type = MetaFileType::TEXT_JSON; // TODO: better defaults
if (_reg.all_of<FragComp::Ephemeral::MetaFileType>(fid)) {
meta_type = _reg.get<FragComp::Ephemeral::MetaFileType>(fid).type;
if (_reg.all_of<ObjComp::Ephemeral::MetaFileType>(fid)) {
meta_type = _reg.get<ObjComp::Ephemeral::MetaFileType>(fid).type;
}
Encryption meta_enc = Encryption::NONE; // TODO: better defaults
Compression meta_comp = Compression::NONE; // TODO: better defaults
if (meta_type != MetaFileType::TEXT_JSON) {
if (_reg.all_of<FragComp::Ephemeral::MetaEncryptionType>(fid)) {
meta_enc = _reg.get<FragComp::Ephemeral::MetaEncryptionType>(fid).enc;
if (_reg.all_of<ObjComp::Ephemeral::MetaEncryptionType>(fid)) {
meta_enc = _reg.get<ObjComp::Ephemeral::MetaEncryptionType>(fid).enc;
}
if (_reg.all_of<FragComp::Ephemeral::MetaCompressionType>(fid)) {
meta_comp = _reg.get<FragComp::Ephemeral::MetaCompressionType>(fid).comp;
if (_reg.all_of<ObjComp::Ephemeral::MetaCompressionType>(fid)) {
meta_comp = _reg.get<ObjComp::Ephemeral::MetaCompressionType>(fid).comp;
}
} else {
// we cant have encryption or compression
// so we force NONE for TEXT JSON
_reg.emplace_or_replace<FragComp::Ephemeral::MetaEncryptionType>(fid, Encryption::NONE);
_reg.emplace_or_replace<FragComp::Ephemeral::MetaCompressionType>(fid, Compression::NONE);
_reg.emplace_or_replace<ObjComp::Ephemeral::MetaEncryptionType>(fid, Encryption::NONE);
_reg.emplace_or_replace<ObjComp::Ephemeral::MetaCompressionType>(fid, Compression::NONE);
}
std::filesystem::path meta_tmp_path = _reg.get<FragComp::Ephemeral::FilePath>(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp";
std::filesystem::path meta_tmp_path = _reg.get<ObjComp::Ephemeral::FilePath>(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp";
meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string());
// TODO: make meta comp work with mem compressor
//auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp);
@ -241,7 +241,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp;
}
std::filesystem::path data_tmp_path = _reg.get<FragComp::Ephemeral::FilePath>(fid).path + ".tmp";
std::filesystem::path data_tmp_path = _reg.get<ObjComp::Ephemeral::FilePath>(fid).path + ".tmp";
data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string());
auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp);
if (data_file_stack.empty()) {
@ -289,7 +289,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
std::filesystem::remove(meta_tmp_path);
while (!data_file_stack.empty()) { data_file_stack.pop(); }
std::filesystem::remove(data_tmp_path);
std::cerr << "FS error: binary writer creation failed '" << _reg.get<FragComp::Ephemeral::FilePath>(fid).path << "'\n";
std::cerr << "FS error: binary writer creation failed '" << _reg.get<ObjComp::Ephemeral::FilePath>(fid).path << "'\n";
return false;
}
@ -301,7 +301,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
std::filesystem::remove(meta_tmp_path);
while (!data_file_stack.empty()) { data_file_stack.pop(); }
std::filesystem::remove(data_tmp_path);
std::cerr << "FS error: binary writer failed '" << _reg.get<FragComp::Ephemeral::FilePath>(fid).path << "'\n";
std::cerr << "FS error: binary writer failed '" << _reg.get<ObjComp::Ephemeral::FilePath>(fid).path << "'\n";
return false;
}
}
@ -361,18 +361,18 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
std::filesystem::rename(
meta_tmp_path,
_reg.get<FragComp::Ephemeral::FilePath>(fid).path + ".meta" + metaFileTypeSuffix(meta_type)
_reg.get<ObjComp::Ephemeral::FilePath>(fid).path + ".meta" + metaFileTypeSuffix(meta_type)
);
std::filesystem::rename(
data_tmp_path,
_reg.get<FragComp::Ephemeral::FilePath>(fid).path
_reg.get<ObjComp::Ephemeral::FilePath>(fid).path
);
// TODO: check return value of renames
if (_reg.all_of<FragComp::Ephemeral::DirtyTag>(fid)) {
_reg.remove<FragComp::Ephemeral::DirtyTag>(fid);
if (_reg.all_of<ObjComp::Ephemeral::DirtyTag>(fid)) {
_reg.remove<ObjComp::Ephemeral::DirtyTag>(fid);
}
return true;
@ -396,13 +396,13 @@ bool FragmentStore::loadFromStorage(FragmentID fid, std::function<read_from_stor
return false;
}
if (!_reg.all_of<FragComp::Ephemeral::FilePath>(fid)) {
if (!_reg.all_of<ObjComp::Ephemeral::FilePath>(fid)) {
// not a file fragment?
// TODO: memory fragments
return false;
}
const auto& frag_path = _reg.get<FragComp::Ephemeral::FilePath>(fid).path;
const auto& frag_path = _reg.get<ObjComp::Ephemeral::FilePath>(fid).path;
// TODO: check if metadata dirty?
// TODO: what if file changed on disk?
@ -672,7 +672,7 @@ size_t FragmentStore::scanStoragePath(std::string_view path) {
FragmentHandle fh{_reg, _reg.create()};
fh.emplace<FragComp::ID>(hex2bin(it.id_str));
fh.emplace<FragComp::Ephemeral::FilePath>(it.frag_path.generic_u8string());
fh.emplace<ObjComp::Ephemeral::FilePath>(it.frag_path.generic_u8string());
for (const auto& [k, v] : j.items()) {
// type id from string hash

@ -94,9 +94,5 @@ struct FragmentStore : public FragmentStoreI {
private:
void registerSerializers(void); // internal comps
// internal actual backends
// TODO: seperate out
bool syncToMemory(FragmentID fid, std::function<write_to_storage_fetch_data_cb>& data_cb);
bool syncToFile(FragmentID fid, std::function<write_to_storage_fetch_data_cb>& data_cb);
};

@ -68,7 +68,7 @@ namespace Message::Components {
} // Message::Components
namespace Fragment::Components {
namespace ObjectStore::Components {
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesTSRange, begin, end)
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesContact, id)
@ -82,7 +82,7 @@ namespace Fragment::Components {
Contact3 e {entt::null};
};
}
} // Fragment::Component
} // ObjectStore::Component
void MessageFragmentStore::handleMessage(const Message3Handle& m) {
if (_fs_ignore_event) {
@ -133,7 +133,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
assert(static_cast<bool>(fh));
// assuming ts range exists
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
auto& fts_comp = fh.get<ObjComp::MessagesTSRange>();
if (fts_comp.begin <= msg_ts && fts_comp.end >= msg_ts) {
fragment_id = fid;
@ -149,7 +149,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
assert(static_cast<bool>(fh));
// assuming ts range exists
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
auto& fts_comp = fh.get<ObjComp::MessagesTSRange>();
const int64_t frag_range = int64_t(fts_comp.end) - int64_t(fts_comp.begin);
constexpr static int64_t max_frag_ts_extent {1000*60*60};
@ -205,17 +205,17 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
fragment_id = fh;
fh.emplace_or_replace<FragComp::Ephemeral::MetaCompressionType>().comp = Compression::ZSTD;
fh.emplace_or_replace<FragComp::DataCompressionType>().comp = Compression::ZSTD;
fh.emplace_or_replace<ObjComp::Ephemeral::MetaCompressionType>().comp = Compression::ZSTD;
fh.emplace_or_replace<ObjComp::DataCompressionType>().comp = Compression::ZSTD;
auto& new_ts_range = fh.emplace_or_replace<FragComp::MessagesTSRange>();
auto& new_ts_range = fh.emplace_or_replace<ObjComp::MessagesTSRange>();
new_ts_range.begin = msg_ts;
new_ts_range.end = msg_ts;
{
const auto msg_reg_contact = m.registry()->ctx().get<Contact3>();
if (_cr.all_of<Contact::Components::ID>(msg_reg_contact)) {
fh.emplace<FragComp::MessagesContact>(_cr.get<Contact::Components::ID>(msg_reg_contact).data);
fh.emplace<ObjComp::MessagesContact>(_cr.get<Contact::Components::ID>(msg_reg_contact).data);
} else {
// ? rage quit?
}
@ -235,7 +235,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
fid_open.emplace(fragment_id);
std::cout << "MFS: created new fragment " << bin2hex(fh.get<FragComp::ID>().v) << "\n";
std::cout << "MFS: created new fragment " << bin2hex(fh.get<ObjComp::ID>().v) << "\n";
_fs_ignore_event = true;
_fs.throwEventConstruct(fh);
@ -296,13 +296,13 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh
if (!j.is_array()) {
// wrong data
fh.emplace_or_replace<FragComp::Ephemeral::MessagesEmptyTag>();
fh.emplace_or_replace<ObjComp::Ephemeral::MessagesEmptyTag>();
return;
}
if (j.size() == 0) {
// empty array
fh.emplace_or_replace<FragComp::Ephemeral::MessagesEmptyTag>();
fh.emplace_or_replace<ObjComp::Ephemeral::MessagesEmptyTag>();
return;
}
@ -389,12 +389,12 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh
if (messages_new_or_updated == 0) {
// useless frag
// TODO: unload?
fh.emplace_or_replace<FragComp::Ephemeral::MessagesEmptyTag>();
fh.emplace_or_replace<ObjComp::Ephemeral::MessagesEmptyTag>();
}
}
bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry& reg) {
auto& ftsrange = fh.get_or_emplace<FragComp::MessagesTSRange>(Message::getTimeMS(), Message::getTimeMS());
auto& ftsrange = fh.get_or_emplace<ObjComp::MessagesTSRange>(Message::getTimeMS(), Message::getTimeMS());
auto j = nlohmann::json::array();
@ -476,10 +476,16 @@ MessageFragmentStore::MessageFragmentStore(
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
_fs._sc.registerSerializerJson<FragComp::MessagesTSRange>();
_fs._sc.registerDeSerializerJson<FragComp::MessagesTSRange>();
_fs._sc.registerSerializerJson<FragComp::MessagesContact>();
_fs._sc.registerDeSerializerJson<FragComp::MessagesContact>();
_fs._sc.registerSerializerJson<ObjComp::MessagesTSRange>();
_fs._sc.registerDeSerializerJson<ObjComp::MessagesTSRange>();
_fs._sc.registerSerializerJson<ObjComp::MessagesContact>();
_fs._sc.registerDeSerializerJson<ObjComp::MessagesContact>();
// old
_fs._sc.registerSerializerJson<FragComp::MessagesTSRange>(_fs._sc.component_get_json<ObjComp::MessagesTSRange>);
_fs._sc.registerDeSerializerJson<FragComp::MessagesTSRange>(_fs._sc.component_emplace_or_replace_json<ObjComp::MessagesTSRange>);
_fs._sc.registerSerializerJson<FragComp::MessagesContact>(_fs._sc.component_get_json<ObjComp::MessagesContact>);
_fs._sc.registerDeSerializerJson<FragComp::MessagesContact>(_fs._sc.component_emplace_or_replace_json<ObjComp::MessagesContact>);
_fs.subscribe(this, FragmentStore_Event::fragment_construct);
_fs.subscribe(this, FragmentStore_Event::fragment_updated);
@ -589,12 +595,12 @@ float MessageFragmentStore::tick(float) {
return 0.05f;
}
if (!fh.all_of<FragComp::MessagesTSRange>()) {
if (!fh.all_of<ObjComp::MessagesTSRange>()) {
return 0.05f;
}
// get ts range of frag and collide with all curser(s/ranges)
const auto& frag_range = fh.get<FragComp::MessagesTSRange>();
const auto& frag_range = fh.get<ObjComp::MessagesTSRange>();
auto* msg_reg = _rmm.get(c);
if (msg_reg == nullptr) {
@ -646,19 +652,19 @@ float MessageFragmentStore::tick(float) {
return 0.05f;
}
if (!fh.all_of<FragComp::MessagesTSRange>()) {
if (!fh.all_of<ObjComp::MessagesTSRange>()) {
std::cerr << "MFS error: frag has no range\n";
// ????
msg_reg->ctx().get<Message::Components::ContactFragments>().erase(fid);
return 0.05f;
}
if (fh.all_of<FragComp::Ephemeral::MessagesEmptyTag>()) {
if (fh.all_of<ObjComp::Ephemeral::MessagesEmptyTag>()) {
continue; // skip known empty
}
// get ts range of frag and collide with all curser(s/ranges)
const auto& [range_begin, range_end] = fh.get<FragComp::MessagesTSRange>();
const auto& [range_begin, range_end] = fh.get<ObjComp::MessagesTSRange>();
if (rangeVisible(range_begin, range_end, *msg_reg)) {
std::cout << "MFS: frag hit by vis range\n";
@ -691,7 +697,7 @@ float MessageFragmentStore::tick(float) {
cf.sorted_end.crend(),
ts_begin_comp.ts,
[&](const FragmentID element, const auto& value) -> bool {
return _fs._reg.get<FragComp::MessagesTSRange>(element).end >= value;
return _fs._reg.get<ObjComp::MessagesTSRange>(element).end >= value;
}
);
@ -709,7 +715,7 @@ float MessageFragmentStore::tick(float) {
// only ok bc next is cheap
for (size_t i = 0; i < 5 && _fs._reg.valid(next_frag); next_frag = cf.next(next_frag)) {
auto fh = _fs.fragmentHandle(next_frag);
if (fh.any_of<FragComp::Ephemeral::MessagesEmptyTag>()) {
if (fh.any_of<ObjComp::Ephemeral::MessagesEmptyTag>()) {
continue; // skip known empty
}
@ -739,7 +745,7 @@ float MessageFragmentStore::tick(float) {
cf.sorted_begin.cend(),
ts_end,
[&](const FragmentID element, const auto& value) -> bool {
return _fs._reg.get<FragComp::MessagesTSRange>(element).begin < value;
return _fs._reg.get<ObjComp::MessagesTSRange>(element).begin < value;
}
);
@ -757,7 +763,7 @@ float MessageFragmentStore::tick(float) {
// only ok bc next is cheap
for (size_t i = 0; i < 5 && _fs._reg.valid(prev_frag); prev_frag = cf.prev(prev_frag)) {
auto fh = _fs.fragmentHandle(prev_frag);
if (fh.any_of<FragComp::Ephemeral::MessagesEmptyTag>()) {
if (fh.any_of<ObjComp::Ephemeral::MessagesEmptyTag>()) {
continue; // skip known empty
}
@ -806,7 +812,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e)
return false; // skip self
}
if (!e.e.all_of<FragComp::MessagesTSRange, FragComp::MessagesContact>()) {
if (!e.e.all_of<ObjComp::MessagesTSRange, ObjComp::MessagesContact>()) {
return false; // not for us
}
@ -814,7 +820,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e)
Contact3 frag_contact = entt::null;
{ // get contact
const auto& frag_contact_id = e.e.get<FragComp::MessagesContact>().id;
const auto& frag_contact_id = e.e.get<ObjComp::MessagesContact>().id;
// TODO: id lookup table, this is very inefficent
for (const auto& [c_it, id_it] : _cr.view<Contact::Components::ID>().each()) {
if (frag_contact_id == id_it.data) {
@ -826,7 +832,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e)
// unkown contact
return false;
}
e.e.emplace_or_replace<FragComp::Ephemeral::MessagesContactEntity>(frag_contact);
e.e.emplace_or_replace<ObjComp::Ephemeral::MessagesContactEntity>(frag_contact);
}
// create if not exist
@ -853,23 +859,23 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) {
return false; // skip self
}
if (!e.e.all_of<FragComp::MessagesTSRange, FragComp::MessagesContact>()) {
if (!e.e.all_of<ObjComp::MessagesTSRange, ObjComp::MessagesContact>()) {
return false; // not for us
}
// since its an update, we might have it associated, or not
// its also possible it was tagged as empty
e.e.remove<FragComp::Ephemeral::MessagesEmptyTag>();
e.e.remove<ObjComp::Ephemeral::MessagesEmptyTag>();
Contact3 frag_contact = entt::null;
{ // get contact
// probably cached already
if (e.e.all_of<FragComp::Ephemeral::MessagesContactEntity>()) {
frag_contact = e.e.get<FragComp::Ephemeral::MessagesContactEntity>().e;
if (e.e.all_of<ObjComp::Ephemeral::MessagesContactEntity>()) {
frag_contact = e.e.get<ObjComp::Ephemeral::MessagesContactEntity>().e;
}
if (!_cr.valid(frag_contact)) {
const auto& frag_contact_id = e.e.get<FragComp::MessagesContact>().id;
const auto& frag_contact_id = e.e.get<ObjComp::MessagesContact>().id;
// TODO: id lookup table, this is very inefficent
for (const auto& [c_it, id_it] : _cr.view<Contact::Components::ID>().each()) {
if (frag_contact_id == id_it.data) {
@ -881,7 +887,7 @@ bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentUpdated& e) {
// unkown contact
return false;
}
e.e.emplace_or_replace<FragComp::Ephemeral::MessagesContactEntity>(frag_contact);
e.e.emplace_or_replace<ObjComp::Ephemeral::MessagesContactEntity>(frag_contact);
}
}
@ -921,15 +927,15 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) {
sorted_begin.cbegin(),
sorted_begin.cend(),
[frag](const FragmentID a) -> bool {
const auto begin_a = frag.registry()->get<FragComp::MessagesTSRange>(a).begin;
const auto begin_frag = frag.get<FragComp::MessagesTSRange>().begin;
const auto begin_a = frag.registry()->get<ObjComp::MessagesTSRange>(a).begin;
const auto begin_frag = frag.get<ObjComp::MessagesTSRange>().begin;
if (begin_a > begin_frag) {
return true;
} else if (begin_a < begin_frag) {
return false;
} else {
// equal ts, we need to fall back to id (id can not be equal)
return isLess(frag.get<FragComp::ID>().v, frag.registry()->get<FragComp::ID>(a).v);
return isLess(frag.get<ObjComp::ID>().v, frag.registry()->get<ObjComp::ID>(a).v);
}
}
);
@ -946,15 +952,15 @@ bool Message::Components::ContactFragments::insert(FragmentHandle frag) {
sorted_end.cbegin(),
sorted_end.cend(),
[frag](const FragmentID a) -> bool {
const auto end_a = frag.registry()->get<FragComp::MessagesTSRange>(a).end;
const auto end_frag = frag.get<FragComp::MessagesTSRange>().end;
const auto end_a = frag.registry()->get<ObjComp::MessagesTSRange>(a).end;
const auto end_frag = frag.get<ObjComp::MessagesTSRange>().end;
if (end_a > end_frag) {
return true;
} else if (end_a < end_frag) {
return false;
} else {
// equal ts, we need to fall back to id (id can not be equal)
return isLess(frag.get<FragComp::ID>().v, frag.registry()->get<FragComp::ID>(a).v);
return isLess(frag.get<ObjComp::ID>().v, frag.registry()->get<ObjComp::ID>(a).v);
}
}
);

@ -54,7 +54,7 @@ namespace Message::Components {
} // Message::Components
namespace Fragment::Components {
namespace ObjectStore::Components {
struct MessagesTSRange {
// timestamp range within the fragment
uint64_t begin {0}; // newer msg -> higher number
@ -67,6 +67,12 @@ namespace Fragment::Components {
// TODO: add src contact (self id)
} // ObjectStore::Components
// old
namespace Fragment::Components {
struct MessagesTSRange : public ObjComp::MessagesTSRange {};
struct MessagesContact : public ObjComp::MessagesContact {};
} // Fragment::Components
// handles fragments for messages

@ -1,12 +1,13 @@
#pragma once
#include "./types.hpp"
#include "object_store.hpp"
#include <vector>
#include <string>
#include <cstdint>
namespace Fragment::Components {
namespace ObjectStore::Components {
// TODO: is this special and should this be saved to meta or not (its already in the file name on disk)
struct ID {
@ -25,18 +26,6 @@ namespace Fragment::Components {
// meta that is not written to (meta-)file
namespace Ephemeral {
// excluded from file meta
struct FilePath {
// contains store path, if any
std::string path;
};
// TODO: seperate into remote and local?
// (remote meaning eg. the file on disk was changed by another program)
struct DirtyTag {};
// type as comp
struct MetaFileType {
::MetaFileType type {::MetaFileType::TEXT_JSON};
};
@ -49,11 +38,38 @@ namespace Fragment::Components {
Compression comp {Compression::NONE};
};
struct Backend {
// TODO: shared_ptr instead??
StorageBackendI* ptr;
};
// excluded from file meta
// TODO: move to backend specific
struct FilePath {
// contains store path, if any
std::string path;
};
// TODO: seperate into remote and local?
// (remote meaning eg. the file on disk was changed by another program)
struct DirtyTag {};
} // Ephemeral
} // Components
// shortened to save bytes (until I find a way to save by ID in msgpack)
namespace ObjComp = ObjectStore::Components;
// old names
namespace Fragment::Components {
//struct ID {};
//struct DataEncryptionType {};
//struct DataCompressionType {};
struct ID : public ObjComp::ID {};
struct DataEncryptionType : public ObjComp::DataEncryptionType {};
struct DataCompressionType : public ObjComp::DataCompressionType {};
}
namespace FragComp = Fragment::Components;
#include "./meta_components_id.inl"

@ -18,6 +18,10 @@ constexpr std::string_view entt::type_name<x>::value() noexcept { \
// cross compiler stable ids
DEFINE_COMP_ID(ObjComp::DataEncryptionType)
DEFINE_COMP_ID(ObjComp::DataCompressionType)
// old stuff
DEFINE_COMP_ID(FragComp::DataEncryptionType)
DEFINE_COMP_ID(FragComp::DataCompressionType)

@ -22,43 +22,7 @@ bool StorageBackendI::write(Object o, const ByteSpan data) {
return write(o, fn_cb);
}
static bool serl_json_data_enc_type(const ObjectHandle oh, nlohmann::json& out) {
out = static_cast<std::underlying_type_t<Encryption>>(
oh.get<FragComp::DataEncryptionType>().enc
);
return true;
}
static bool deserl_json_data_enc_type(ObjectHandle oh, const nlohmann::json& in) {
oh.emplace_or_replace<FragComp::DataEncryptionType>(
static_cast<Encryption>(
static_cast<std::underlying_type_t<Encryption>>(in)
)
);
return true;
}
static bool serl_json_data_comp_type(const ObjectHandle oh, nlohmann::json& out) {
out = static_cast<std::underlying_type_t<Compression>>(
oh.get<FragComp::DataCompressionType>().comp
);
return true;
}
static bool deserl_json_data_comp_type(ObjectHandle oh, const nlohmann::json& in) {
oh.emplace_or_replace<FragComp::DataCompressionType>(
static_cast<Compression>(
static_cast<std::underlying_type_t<Compression>>(in)
)
);
return true;
}
ObjectStore2::ObjectStore2(void) {
_sc.registerSerializerJson<FragComp::DataEncryptionType>(serl_json_data_enc_type);
_sc.registerDeSerializerJson<FragComp::DataEncryptionType>(deserl_json_data_enc_type);
_sc.registerSerializerJson<FragComp::DataCompressionType>(serl_json_data_comp_type);
_sc.registerDeSerializerJson<FragComp::DataCompressionType>(deserl_json_data_comp_type);
}
ObjectStore2::~ObjectStore2(void) {

@ -75,8 +75,6 @@ struct ObjectStore2 : public ObjectStoreEventProviderI {
ObjectRegistry _reg;
SerializerCallbacks<Object> _sc;
// TODO: default backend?
ObjectStore2(void);