forked from Green-Sky/tomato
message fragment meta is saved, but still empty data
This commit is contained in:
parent
2bc30ffcdc
commit
3d41eedf48
@ -1,11 +1,14 @@
|
|||||||
#include "./message_fragment_store.hpp"
|
#include "./message_fragment_store.hpp"
|
||||||
|
|
||||||
|
#include <solanaceae/util/utils.hpp>
|
||||||
|
|
||||||
#include <solanaceae/message3/components.hpp>
|
#include <solanaceae/message3/components.hpp>
|
||||||
|
|
||||||
#include <nlohmann/json.hpp>
|
#include <nlohmann/json.hpp>
|
||||||
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) {
|
static bool serl_json_msg_ts_range(void* comp, nlohmann::json& out) {
|
||||||
if (comp == nullptr) {
|
if (comp == nullptr) {
|
||||||
@ -40,8 +43,8 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
|||||||
std::vector<uint8_t> fragment_uid;
|
std::vector<uint8_t> fragment_uid;
|
||||||
|
|
||||||
// first search for fragment where the ts falls into the range
|
// first search for fragment where the ts falls into the range
|
||||||
for (const auto& [tsrage, fid] : _fuid_open) {
|
for (const auto& [ts_begin, ts_end, fid] : _fuid_open) {
|
||||||
if (tsrage.ts_begin <= msg_ts && tsrage.ts_end >= msg_ts) {
|
if (ts_begin <= msg_ts && ts_end >= msg_ts) {
|
||||||
fragment_uid = fid;
|
fragment_uid = fid;
|
||||||
// TODO: check conditions for open here
|
// TODO: check conditions for open here
|
||||||
// TODO: mark msg (and frag?) dirty
|
// TODO: mark msg (and frag?) dirty
|
||||||
@ -50,32 +53,39 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
|||||||
|
|
||||||
// if it did not fit into an existing fragment, we next look for fragments that could be extended
|
// if it did not fit into an existing fragment, we next look for fragments that could be extended
|
||||||
if (fragment_uid.empty()) {
|
if (fragment_uid.empty()) {
|
||||||
for (const auto& [tsrage, fid] : _fuid_open) {
|
for (auto& [ts_begin, ts_end, fid] : _fuid_open) {
|
||||||
const int64_t frag_range = int64_t(tsrage.ts_end) - int64_t(tsrage.ts_begin);
|
const int64_t frag_range = int64_t(ts_end) - int64_t(ts_begin);
|
||||||
//constexpr static int64_t max_frag_ts_extent {1000*60*60};
|
//constexpr static int64_t max_frag_ts_extent {1000*60*60};
|
||||||
constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing
|
constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing
|
||||||
const int64_t possible_extention = max_frag_ts_extent - frag_range;
|
const int64_t possible_extention = max_frag_ts_extent - frag_range;
|
||||||
|
|
||||||
// which direction
|
// which direction
|
||||||
if ((tsrage.ts_begin - possible_extention) <= msg_ts) {
|
if ((ts_begin - possible_extention) <= msg_ts && ts_begin > msg_ts) {
|
||||||
fragment_uid = fid;
|
fragment_uid = fid;
|
||||||
auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid));
|
auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid));
|
||||||
assert(static_cast<bool>(fh));
|
assert(static_cast<bool>(fh));
|
||||||
|
|
||||||
|
std::cout << "MFS: extended begin from " << ts_begin << " to " << msg_ts << "\n";
|
||||||
|
|
||||||
// assuming ts range exists
|
// assuming ts range exists
|
||||||
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
|
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
|
||||||
fts_comp.begin = msg_ts; // extend into the past
|
fts_comp.begin = msg_ts; // extend into the past
|
||||||
|
ts_begin = msg_ts;
|
||||||
|
|
||||||
|
|
||||||
// TODO: check conditions for open here
|
// TODO: check conditions for open here
|
||||||
// TODO: mark msg (and frag?) dirty
|
// TODO: mark msg (and frag?) dirty
|
||||||
} else if ((tsrage.ts_end + possible_extention) >= msg_ts) {
|
} else if ((ts_end + possible_extention) >= msg_ts && ts_end < msg_ts) {
|
||||||
fragment_uid = fid;
|
fragment_uid = fid;
|
||||||
auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid));
|
auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid));
|
||||||
assert(static_cast<bool>(fh));
|
assert(static_cast<bool>(fh));
|
||||||
|
|
||||||
|
std::cout << "MFS: extended end from " << ts_end << " to " << msg_ts << "\n";
|
||||||
|
|
||||||
// assuming ts range exists
|
// assuming ts range exists
|
||||||
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
|
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
|
||||||
fts_comp.end = msg_ts; // extend into the future
|
fts_comp.end = msg_ts; // extend into the future
|
||||||
|
ts_end = msg_ts;
|
||||||
|
|
||||||
// TODO: check conditions for open here
|
// TODO: check conditions for open here
|
||||||
// TODO: mark msg (and frag?) dirty
|
// TODO: mark msg (and frag?) dirty
|
||||||
@ -85,14 +95,34 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
|||||||
|
|
||||||
// if its still not found, we need a new fragment
|
// if its still not found, we need a new fragment
|
||||||
if (fragment_uid.empty()) {
|
if (fragment_uid.empty()) {
|
||||||
|
const auto new_fid = _fs.newFragmentFile("test_message_store/", MetaFileType::TEXT_JSON);
|
||||||
|
auto fh = _fs.fragmentHandle(new_fid);
|
||||||
|
if (!static_cast<bool>(fh)) {
|
||||||
|
std::cout << "MFS error: failed to create new fragment for message\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
fragment_uid = fh.get<FragComp::ID>().v;
|
||||||
|
|
||||||
|
auto& new_ts_range = fh.emplace<FragComp::MessagesTSRange>();
|
||||||
|
new_ts_range.begin = msg_ts;
|
||||||
|
new_ts_range.end = msg_ts;
|
||||||
|
|
||||||
|
_fuid_open.emplace_back(OpenFrag{msg_ts, msg_ts, fragment_uid});
|
||||||
|
|
||||||
|
std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
// if this is still empty, something is very wrong and we exit here
|
// if this is still empty, something is very wrong and we exit here
|
||||||
if (fragment_uid.empty()) {
|
if (fragment_uid.empty()) {
|
||||||
|
std::cout << "MFS error: failed to find/create fragment for message\n";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
m.emplace<Message::Components::FUID>(fragment_uid);
|
m.emplace<Message::Components::FUID>(fragment_uid);
|
||||||
|
|
||||||
|
// in this case we know the fragment needs an update
|
||||||
|
_fuid_save_queue.push({Message::getTimeMS(), fragment_uid});
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: do we use fid?
|
// TODO: do we use fid?
|
||||||
@ -116,6 +146,21 @@ MessageFragmentStore::~MessageFragmentStore(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
float MessageFragmentStore::tick(float time_delta) {
|
float MessageFragmentStore::tick(float time_delta) {
|
||||||
|
// sync dirty fragments here
|
||||||
|
|
||||||
|
if (!_fuid_save_queue.empty()) {
|
||||||
|
const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id);
|
||||||
|
auto j = nlohmann::json::object();
|
||||||
|
// if save as binary
|
||||||
|
//nlohmann::json::to_msgpack(j);
|
||||||
|
auto j_dump = j.dump(2, ' ', true);
|
||||||
|
if (_fs.syncToStorage(fid, reinterpret_cast<const uint8_t*>(j_dump.data()), j_dump.size())) {
|
||||||
|
std::cout << "MFS: dumped " << j_dump << "\n";
|
||||||
|
// succ
|
||||||
|
_fuid_save_queue.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 1000.f*60.f*60.f;
|
return 1000.f*60.f*60.f;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,3 +174,5 @@ bool MessageFragmentStore::onEvent(const Message::Events::MessageUpdated& e) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: handle deletes? diff between unload?
|
||||||
|
|
||||||
|
@ -4,13 +4,14 @@
|
|||||||
#include "./fragment_store_i.hpp"
|
#include "./fragment_store_i.hpp"
|
||||||
#include "./fragment_store.hpp"
|
#include "./fragment_store.hpp"
|
||||||
|
|
||||||
#include <cstdint>
|
|
||||||
#include <entt/entity/registry.hpp>
|
#include <entt/entity/registry.hpp>
|
||||||
#include <entt/container/dense_map.hpp>
|
#include <entt/container/dense_map.hpp>
|
||||||
|
|
||||||
#include <solanaceae/message3/registry_message_model.hpp>
|
#include <solanaceae/message3/registry_message_model.hpp>
|
||||||
|
|
||||||
#include <map>
|
#include <queue>
|
||||||
|
#include <vector>
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
namespace Message::Components {
|
namespace Message::Components {
|
||||||
|
|
||||||
@ -41,14 +42,20 @@ class MessageFragmentStore : public RegistryMessageModelEventI {
|
|||||||
|
|
||||||
void handleMessage(const Message3Handle& m);
|
void handleMessage(const Message3Handle& m);
|
||||||
|
|
||||||
struct TSRange final {
|
struct OpenFrag final {
|
||||||
uint64_t ts_begin {0};
|
uint64_t ts_begin {0};
|
||||||
uint64_t ts_end {0};
|
uint64_t ts_end {0};
|
||||||
|
std::vector<uint8_t> uid;
|
||||||
};
|
};
|
||||||
// only contains fragments with <1024 messages and <28h tsrage
|
// only contains fragments with <1024 messages and <28h tsrage
|
||||||
std::map<TSRange, std::vector<uint8_t>> _fuid_open;
|
// TODO: this needs to move into the message reg
|
||||||
|
std::vector<OpenFrag> _fuid_open;
|
||||||
|
|
||||||
std::map<uint64_t, std::vector<uint8_t>> _fuid_save_queue;
|
struct QueueEntry final {
|
||||||
|
uint64_t ts_since_dirty{0};
|
||||||
|
std::vector<uint8_t> id;
|
||||||
|
};
|
||||||
|
std::queue<QueueEntry> _fuid_save_queue;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
MessageFragmentStore(
|
MessageFragmentStore(
|
||||||
|
@ -422,6 +422,7 @@ Screen* MainScreen::tick(float time_delta, bool& quit) {
|
|||||||
tdch.tick(time_delta); // compute
|
tdch.tick(time_delta); // compute
|
||||||
|
|
||||||
mts.iterate(); // compute
|
mts.iterate(); // compute
|
||||||
|
mfs.tick(time_delta); // TODO: use delta
|
||||||
|
|
||||||
_min_tick_interval = std::min<float>(
|
_min_tick_interval = std::min<float>(
|
||||||
// HACK: pow by 1.6 to increase 50 -> ~500 (~522)
|
// HACK: pow by 1.6 to increase 50 -> ~500 (~522)
|
||||||
|
Loading…
Reference in New Issue
Block a user