basically working, but some dup glitch is still there

This commit is contained in:
Green Sky 2024-02-17 11:49:03 +01:00
parent 97aedca844
commit 3d0863ff9a
No known key found for this signature in database
5 changed files with 203 additions and 42 deletions

View File

@ -370,6 +370,64 @@ bool FragmentStore::syncToStorage(FragmentID fid, const uint8_t* data, const uin
return syncToStorage(fid, fn_cb);
}
bool FragmentStore::loadFromStorage(FragmentID fid, std::function<read_from_storage_put_data_cb>& data_cb) {
if (!_reg.valid(fid)) {
return false;
}
if (!_reg.all_of<FragComp::Ephemeral::FilePath>(fid)) {
// not a file fragment?
// TODO: memory fragments
return false;
}
const auto& frag_path = _reg.get<FragComp::Ephemeral::FilePath>(fid).path;
// TODO: check if metadata dirty?
// TODO: what if file changed on disk?
std::cout << "FS: loading fragment '" << frag_path << "'\n";
std::ifstream data_file{
frag_path,
std::ios::in | std::ios::binary // always binary, also for text
};
if (!data_file.is_open()) {
std::cerr << "FS error: fragment data file failed to open '" << frag_path << "'\n";
// error
return false;
}
std::array<uint8_t, 1024> buffer;
uint64_t buffer_actual_size {0};
do {
data_file.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
buffer_actual_size = data_file.gcount();
if (buffer_actual_size == 0) {
break;
}
data_cb(buffer.data(), buffer_actual_size);
} while (buffer_actual_size == buffer.size() && !data_file.eof());
return true;
}
nlohmann::json FragmentStore::loadFromStorageNJ(FragmentID fid) {
std::vector<uint8_t> tmp_buffer;
std::function<read_from_storage_put_data_cb> cb = [&tmp_buffer](const uint8_t* buffer, const uint64_t buffer_size) {
tmp_buffer.insert(tmp_buffer.end(), buffer, buffer+buffer_size);
};
if (!loadFromStorage(fid, cb)) {
return nullptr;
}
return nlohmann::json::parse(tmp_buffer);
}
size_t FragmentStore::scanStoragePath(std::string_view path) {
if (path.empty()) {
path = _default_store_path;
@ -505,7 +563,7 @@ size_t FragmentStore::scanStoragePath(std::string_view path) {
// read binary header
assert(false);
} else if (it.meta_ext == ".meta.json") {
std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext);
std::ifstream file(it.frag_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary);
if (!file.is_open()) {
std::cout << "FS error: failed opening meta " << it.frag_path << "\n";
continue;
@ -523,6 +581,8 @@ size_t FragmentStore::scanStoragePath(std::string_view path) {
FragmentHandle fh{_reg, _reg.create()};
fh.emplace<FragComp::ID>(hex2bin(it.id_str));
fh.emplace<FragComp::Ephemeral::FilePath>(it.frag_path.generic_u8string());
for (const auto& [k, v] : j.items()) {
// type id from string hash
const auto type_id = entt::hashed_string(k.data(), k.size());

View File

@ -82,6 +82,12 @@ struct FragmentStore : public FragmentStoreI {
bool syncToStorage(FragmentID fid, std::function<write_to_storage_fetch_data_cb>& data_cb);
bool syncToStorage(FragmentID fid, const uint8_t* data, const uint64_t data_size);
// ========== load fragment data from storage ==========
using read_from_storage_put_data_cb = void(const uint8_t* buffer, const uint64_t buffer_size);
bool loadFromStorage(FragmentID fid, std::function<read_from_storage_put_data_cb>& data_cb);
// convenience function
nlohmann::json loadFromStorageNJ(FragmentID fid);
// fragment discovery?
// returns number of new fragments
size_t scanStoragePath(std::string_view path);

View File

@ -7,10 +7,13 @@
#include <nlohmann/json.hpp>
#include <string>
#include <cstdint>
#include <cassert>
#include <iostream>
// https://youtu.be/CU2exyhYPfA
namespace Message::Components {
// ctx
@ -46,43 +49,6 @@ namespace Fragment::Components {
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(MessagesContact, id)
} // Fragment::Components
template<typename T>
static bool serl_json_default(void* comp, nlohmann::json& out) {
if constexpr (!std::is_empty_v<T>) {
out = *reinterpret_cast<T*>(comp);
} // do nothing if empty type
return true;
}
static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) {
if (comp == nullptr) {
return false;
}
out = nlohmann::json::object();
auto& r_comp = *reinterpret_cast<FragComp::MessagesTSRange*>(comp);
out["begin"] = r_comp.begin;
out["end"] = r_comp.end;
return true;
}
static bool serl_json_msg_c_id(void* comp, nlohmann::json& out) {
if (comp == nullptr) {
return false;
}
out = nlohmann::json::object();
auto& r_comp = *reinterpret_cast<FragComp::MessagesContact*>(comp);
out["id"] = r_comp.id;
return true;
}
void MessageFragmentStore::handleMessage(const Message3Handle& m) {
if (!static_cast<bool>(m)) {
return; // huh?
@ -95,6 +61,30 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
if (!m.registry()->ctx().contains<Message::Components::OpenFragments>()) {
// first message in this reg
m.registry()->ctx().emplace<Message::Components::OpenFragments>();
// TODO: move this to async
// new reg -> load all fragments for this contact (for now, ranges later)
for (const auto& [fid, tsrange, fmc] : _fs._reg.view<FragComp::MessagesTSRange, FragComp::MessagesContact>().each()) {
Contact3 frag_contact = entt::null;
// TODO: id lookup table, this is very inefficent
for (const auto& [c_it, id_it] : _cr.view<Contact::Components::ID>().each()) {
if (fmc.id == id_it.data) {
//h.emplace_or_replace<Message::Components::ContactTo>(c_it);
//return true;
frag_contact = c_it;
break;
}
}
if (!_cr.valid(frag_contact)) {
// unkown contact
continue;
}
// registry is the same as the one the message event is for
if (static_cast<const RegistryMessageModel&>(_rmm).get(frag_contact) == m.registry()) {
loadFragment(*m.registry(), FragmentHandle{_fs._reg, fid});
}
}
}
auto& fuid_open = m.registry()->ctx().get<Message::Components::OpenFragments>().fuid_open;
@ -209,6 +199,60 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// on new and update: mark as fragment dirty
}
void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh) {
std::cout << "MFS: loadFragment\n";
const auto j = _fs.loadFromStorageNJ(fh);
if (!j.is_array()) {
// wrong data
return;
}
for (const auto& j_entry : j) {
auto new_real_msg = Message3Handle{reg, reg.create()};
// load into staging reg
for (const auto& [k, v] : j_entry.items()) {
std::cout << "K:" << k << " V:" << v.dump() << "\n";
const auto type_id = entt::hashed_string(k.data(), k.size());
const auto deserl_fn_it = _sc._deserl_json.find(type_id);
if (deserl_fn_it != _sc._deserl_json.cend()) {
try {
if (!deserl_fn_it->second(_sc, new_real_msg, v)) {
std::cerr << "MFS error: failed deserializing '" << k << "'\n";
}
} catch(...) {
std::cerr << "MFS error: failed deserializing (threw) '" << k << "'\n";
}
} else {
std::cerr << "MFS warning: missing deserializer for meta key '" << k << "'\n";
}
}
new_real_msg.emplace_or_replace<Message::Components::FUID>(fh.get<FragComp::ID>());
// TODO: dup checking
const bool is_dup {false};
// dup check (hacky, specific to protocols)
if (is_dup) {
// -> merge with preexisting
// -> throw update
reg.destroy(new_real_msg);
//_rmm.throwEventUpdate(reg, new_real_msg);
} else {
if (!new_real_msg.all_of<Message::Components::Timestamp, Message::Components::ContactFrom, Message::Components::ContactTo>()) {
// does not have needed components to be stand alone
reg.destroy(new_real_msg);
std::cerr << "MFS warning: message with missing basic compoments\n";
continue;
}
// -> throw create
_rmm.throwEventConstruct(reg, new_real_msg);
}
}
}
MessageFragmentStore::MessageFragmentStore(
Contact3Registry& cr,
RegistryMessageModel& rmm,
@ -250,6 +294,8 @@ MessageFragmentStore::MessageFragmentStore(
_sc.registerDeSerializerJson<Message::Components::Transfer::FileInfoLocal>();
_sc.registerSerializerJson<Message::Components::Transfer::TagHaveAll>();
_sc.registerDeSerializerJson<Message::Components::Transfer::TagHaveAll>();
_fs.subscribe(this, FragmentStore_Event::fragment_construct);
}
MessageFragmentStore::~MessageFragmentStore(void) {
@ -302,7 +348,7 @@ float MessageFragmentStore::tick(float time_delta) {
//nlohmann::json::to_msgpack(j);
auto j_dump = j.dump(2, ' ', true);
if (_fs.syncToStorage(fid, reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size())) {
std::cout << "MFS: dumped " << j_dump << "\n";
//std::cout << "MFS: dumped " << j_dump << "\n";
// succ
_fuid_save_queue.pop();
}
@ -327,3 +373,47 @@ bool MessageFragmentStore::onEvent(const Message::Events::MessageUpdated& e) {
// TODO: handle deletes? diff between unload?
bool MessageFragmentStore::onEvent(const Fragment::Events::FragmentConstruct& e) {
if (_fs_ignore_event) {
return false; // skip self
}
if (!e.e.all_of<FragComp::MessagesTSRange, FragComp::MessagesContact>()) {
return false; // not for us
}
// TODO: are we sure it is a *new* fragment?
//std::cout << "MFS: got frag for us!\n";
Contact3 frag_contact = entt::null;
{ // get contact
const auto& frag_contact_id = e.e.get<FragComp::MessagesContact>().id;
// TODO: id lookup table, this is very inefficent
for (const auto& [c_it, id_it] : _cr.view<Contact::Components::ID>().each()) {
if (frag_contact_id == id_it.data) {
//h.emplace_or_replace<Message::Components::ContactTo>(c_it);
//return true;
frag_contact = c_it;
break;
}
}
if (!_cr.valid(frag_contact)) {
// unkown contact
return false;
}
}
// only load if msg reg open
auto* msg_reg = static_cast<const RegistryMessageModel&>(_rmm).get(frag_contact);
if (msg_reg == nullptr) {
// msg reg not created yet
return false;
}
// TODO: should this be done async / on tick() instead of on event?
loadFragment(*msg_reg, e.e);
return false;
}

View File

@ -42,7 +42,7 @@ namespace Fragment::Components {
// on new message: assign fuid
// on new and update: mark as fragment dirty
// on delete: mark as fragment dirty?
class MessageFragmentStore : public RegistryMessageModelEventI {
class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentStoreEventI {
protected:
Contact3Registry& _cr;
RegistryMessageModel& _rmm;
@ -54,6 +54,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI {
void handleMessage(const Message3Handle& m);
void loadFragment(Message3Registry& reg, FragmentHandle fh);
struct QueueEntry final {
uint64_t ts_since_dirty{0};
std::vector<uint8_t> id;
@ -76,5 +78,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI {
protected: // rmm
bool onEvent(const Message::Events::MessageConstruct& e) override;
bool onEvent(const Message::Events::MessageUpdated& e) override;
protected: // fs
bool onEvent(const Fragment::Events::FragmentConstruct& e) override;
};

View File

@ -36,7 +36,7 @@ bool MessageSerializerCallbacks::component_emplace_or_replace_json<Message::Comp
return true;
}
const auto id = static_cast<std::vector<uint8_t>>(j);
const auto id = static_cast<std::vector<uint8_t>>(j.is_binary()?j:j["bytes"]);
// TODO: id lookup table, this is very inefficent
for (const auto& [c_it, id_it] : msc.cr.view<Contact::Components::ID>().each()) {
@ -79,7 +79,7 @@ bool MessageSerializerCallbacks::component_emplace_or_replace_json<Message::Comp
return true;
}
const auto id = static_cast<std::vector<uint8_t>>(j);
const auto id = static_cast<std::vector<uint8_t>>(j.is_binary()?j:j["bytes"]);
// TODO: id lookup table, this is very inefficent
for (const auto& [c_it, id_it] : msc.cr.view<Contact::Components::ID>().each()) {