refactor saving and save on exit

This commit is contained in:
Green Sky 2024-03-03 15:16:01 +01:00
parent bc22451524
commit 88ea3e177d
No known key found for this signature in database
2 changed files with 80 additions and 62 deletions

View File

@ -336,7 +336,7 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh
}
if (reg.valid(dup_msg)) {
// -> merge with preexisting
// -> merge with preexisting (needs to be order independent)
// -> throw update
reg.destroy(new_real_msg);
//_rmm.throwEventUpdate(reg, new_real_msg);
@ -354,6 +354,75 @@ void MessageFragmentStore::loadFragment(Message3Registry& reg, FragmentHandle fh
}
}
bool MessageFragmentStore::syncFragToStorage(FragmentHandle fh, Message3Registry& reg) {
auto& ftsrange = fh.get_or_emplace<FragComp::MessagesTSRange>(Message::getTimeMS(), Message::getTimeMS());
auto j = nlohmann::json::array();
// TODO: does every message have ts?
auto msg_view = reg.view<Message::Components::Timestamp>();
// we also assume all messages have fid
for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) {
const Message3 m = *it;
if (!reg.all_of<Message::Components::FID, Message::Components::ContactFrom, Message::Components::ContactTo>(m)) {
continue;
}
// require msg for now
if (!reg.any_of<Message::Components::MessageText/*, Message::Components::Transfer::FileInfo*/>(m)) {
continue;
}
if (_fuid_save_queue.front().id != reg.get<Message::Components::FID>(m).fid) {
continue; // not ours
}
{ // potentially adjust tsrange (some external processes can change timestamps)
const auto msg_ts = msg_view.get<Message::Components::Timestamp>(m).ts;
if (ftsrange.begin > msg_ts) {
ftsrange.begin = msg_ts;
} else if (ftsrange.end < msg_ts) {
ftsrange.end = msg_ts;
}
}
auto& j_entry = j.emplace_back(nlohmann::json::object());
for (const auto& [type_id, storage] : reg.storage()) {
if (!storage.contains(m)) {
continue;
}
//std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n";
// use type_id to find serializer
auto s_cb_it = _sc._serl_json.find(type_id);
if (s_cb_it == _sc._serl_json.end()) {
// could not find serializer, not saving
//std::cout << "missing " << storage.type().name() << "(" << type_id << ")\n";
continue;
}
s_cb_it->second(_sc, {reg, m}, j_entry[storage.type().name()]);
}
}
// we cant skip if array is empty (in theory it will not be empty later on)
// if save as binary
//nlohmann::json::to_msgpack(j);
auto j_dump = j.dump(2, ' ', true);
if (_fs.syncToStorage(fh, reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size())) {
//std::cout << "MFS: dumped " << j_dump << "\n";
// succ
return true;
}
// TODO: error
return false;
}
MessageFragmentStore::MessageFragmentStore(
Contact3Registry& cr,
RegistryMessageModel& rmm,
@ -372,7 +441,13 @@ MessageFragmentStore::MessageFragmentStore(
}
MessageFragmentStore::~MessageFragmentStore(void) {
// TODO: sync all dirty fragments
while (!_fuid_save_queue.empty()) {
auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id);
auto* reg = _fuid_save_queue.front().reg;
assert(reg != nullptr);
syncFragToStorage(fh, *reg);
_fuid_save_queue.pop(); // pop unconditionally
}
}
MessageSerializerCallbacks& MessageFragmentStore::getMSC(void) {
@ -444,66 +519,7 @@ float MessageFragmentStore::tick(float time_delta) {
auto fh = _fs.fragmentHandle(_fuid_save_queue.front().id);
auto* reg = _fuid_save_queue.front().reg;
assert(reg != nullptr);
auto& ftsrange = fh.get_or_emplace<FragComp::MessagesTSRange>(Message::getTimeMS(), Message::getTimeMS());
auto j = nlohmann::json::array();
// TODO: does every message have ts?
auto msg_view = reg->view<Message::Components::Timestamp>();
// we also assume all messages have fuid (hack: call handle when not?)
for (auto it = msg_view.rbegin(), it_end = msg_view.rend(); it != it_end; it++) {
const Message3 m = *it;
if (!reg->all_of<Message::Components::FID, Message::Components::ContactFrom, Message::Components::ContactTo>(m)) {
continue;
}
// require msg for now
if (!reg->any_of<Message::Components::MessageText/*, Message::Components::Transfer::FileInfo*/>(m)) {
continue;
}
if (_fuid_save_queue.front().id != reg->get<Message::Components::FID>(m).fid) {
continue; // not ours
}
{ // potentially adjust tsrange (some external processes can change timestamps)
const auto msg_ts = msg_view.get<Message::Components::Timestamp>(m).ts;
if (ftsrange.begin > msg_ts) {
ftsrange.begin = msg_ts;
} else if (ftsrange.end < msg_ts) {
ftsrange.end = msg_ts;
}
}
auto& j_entry = j.emplace_back(nlohmann::json::object());
for (const auto& [type_id, storage] : reg->storage()) {
if (!storage.contains(m)) {
continue;
}
//std::cout << "storage type: type_id:" << type_id << " name:" << storage.type().name() << "\n";
// use type_id to find serializer
auto s_cb_it = _sc._serl_json.find(type_id);
if (s_cb_it == _sc._serl_json.end()) {
// could not find serializer, not saving
//std::cout << "missing " << storage.type().name() << "(" << type_id << ")\n";
continue;
}
s_cb_it->second(_sc, {*reg, m}, j_entry[storage.type().name()]);
}
}
// if save as binary
//nlohmann::json::to_msgpack(j);
auto j_dump = j.dump(2, ' ', true);
if (_fs.syncToStorage(fh, reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size())) {
//std::cout << "MFS: dumped " << j_dump << "\n";
// succ
if (syncFragToStorage(fh, *reg)) {
_fuid_save_queue.pop();
}
}

View File

@ -87,6 +87,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public FragmentS
void loadFragment(Message3Registry& reg, FragmentHandle fh);
bool syncFragToStorage(FragmentHandle fh, Message3Registry& reg);
struct SaveQueueEntry final {
uint64_t ts_since_dirty{0};
//std::vector<uint8_t> id;