fix potential tsrange errors and deduplicate state

This commit is contained in:
Green Sky 2024-02-20 11:07:18 +01:00
parent 0896038dd6
commit 795ab2d4e1
No known key found for this signature in database

View File

@ -22,8 +22,6 @@ namespace Message::Components {
// ctx // ctx
struct OpenFragments { struct OpenFragments {
struct OpenFrag final { struct OpenFrag final {
uint64_t ts_begin {0};
uint64_t ts_end {0};
std::vector<uint8_t> uid; std::vector<uint8_t> uid;
}; };
// only contains fragments with <1024 messages and <28h tsrage // only contains fragments with <1024 messages and <28h tsrage
@ -93,9 +91,15 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
std::vector<uint8_t> fragment_uid; std::vector<uint8_t> fragment_uid;
// first search for fragment where the ts falls into the range // first search for fragment where the ts falls into the range
for (const auto& [ts_begin, ts_end, fid] : fuid_open) { for (const auto& fuid : fuid_open) {
if (ts_begin <= msg_ts && ts_end >= msg_ts) { auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fuid.uid));
fragment_uid = fid; assert(static_cast<bool>(fh));
// assuming ts range exists
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
if (fts_comp.begin <= msg_ts && fts_comp.end >= msg_ts) {
fragment_uid = fuid.uid;
// TODO: check conditions for open here // TODO: check conditions for open here
// TODO: mark msg (and frag?) dirty // TODO: mark msg (and frag?) dirty
} }
@ -103,39 +107,37 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
// if it did not fit into an existing fragment, we next look for fragments that could be extended // if it did not fit into an existing fragment, we next look for fragments that could be extended
if (fragment_uid.empty()) { if (fragment_uid.empty()) {
for (auto& [ts_begin, ts_end, fid] : fuid_open) { for (const auto& fuid : fuid_open) {
const int64_t frag_range = int64_t(ts_end) - int64_t(ts_begin); auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fuid.uid));
assert(static_cast<bool>(fh));
// assuming ts range exists
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
const int64_t frag_range = int64_t(fts_comp.end) - int64_t(fts_comp.begin);
constexpr static int64_t max_frag_ts_extent {1000*60*60}; constexpr static int64_t max_frag_ts_extent {1000*60*60};
//constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing //constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing
const int64_t possible_extention = max_frag_ts_extent - frag_range; const int64_t possible_extention = max_frag_ts_extent - frag_range;
// which direction // which direction
if ((ts_begin - possible_extention) <= msg_ts && ts_begin > msg_ts) { if ((fts_comp.begin - possible_extention) <= msg_ts && fts_comp.begin > msg_ts) {
fragment_uid = fid; fragment_uid = fuid.uid;
auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid));
assert(static_cast<bool>(fh));
std::cout << "MFS: extended begin from " << ts_begin << " to " << msg_ts << "\n"; std::cout << "MFS: extended begin from " << fts_comp.begin << " to " << msg_ts << "\n";
// assuming ts range exists // assuming ts range exists
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
fts_comp.begin = msg_ts; // extend into the past fts_comp.begin = msg_ts; // extend into the past
ts_begin = msg_ts;
// TODO: check conditions for open here // TODO: check conditions for open here
// TODO: mark msg (and frag?) dirty // TODO: mark msg (and frag?) dirty
} else if ((ts_end + possible_extention) >= msg_ts && ts_end < msg_ts) { } else if ((fts_comp.end + possible_extention) >= msg_ts && fts_comp.end < msg_ts) {
fragment_uid = fid; fragment_uid = fuid.uid;
auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid));
assert(static_cast<bool>(fh));
std::cout << "MFS: extended end from " << ts_end << " to " << msg_ts << "\n"; std::cout << "MFS: extended end from " << fts_comp.end << " to " << msg_ts << "\n";
// assuming ts range exists // assuming ts range exists
auto& fts_comp = fh.get<FragComp::MessagesTSRange>();
fts_comp.end = msg_ts; // extend into the future fts_comp.end = msg_ts; // extend into the future
ts_end = msg_ts;
// TODO: check conditions for open here // TODO: check conditions for open here
// TODO: mark msg (and frag?) dirty // TODO: mark msg (and frag?) dirty
@ -157,7 +159,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
fh.emplace_or_replace<FragComp::Ephemeral::MetaCompressionType>().comp = Compression::ZSTD; fh.emplace_or_replace<FragComp::Ephemeral::MetaCompressionType>().comp = Compression::ZSTD;
fh.emplace_or_replace<FragComp::DataCompressionType>().comp = Compression::ZSTD; fh.emplace_or_replace<FragComp::DataCompressionType>().comp = Compression::ZSTD;
auto& new_ts_range = fh.emplace<FragComp::MessagesTSRange>(); auto& new_ts_range = fh.emplace_or_replace<FragComp::MessagesTSRange>();
new_ts_range.begin = msg_ts; new_ts_range.begin = msg_ts;
new_ts_range.end = msg_ts; new_ts_range.end = msg_ts;
@ -170,7 +172,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
} }
} }
fuid_open.emplace_back(Message::Components::OpenFragments::OpenFrag{msg_ts, msg_ts, fragment_uid}); fuid_open.emplace_back(Message::Components::OpenFragments::OpenFrag{fragment_uid});
std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n"; std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n";
@ -306,6 +308,10 @@ float MessageFragmentStore::tick(float time_delta) {
const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id); const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id);
auto* reg = _fuid_save_queue.front().reg; auto* reg = _fuid_save_queue.front().reg;
assert(reg != nullptr); assert(reg != nullptr);
auto fh = _fs.fragmentHandle(fid);
auto& ftsrange = fh.get_or_emplace<FragComp::MessagesTSRange>(Message::getTimeMS(), Message::getTimeMS());
auto j = nlohmann::json::array(); auto j = nlohmann::json::array();
// TODO: does every message have ts? // TODO: does every message have ts?
@ -327,6 +333,15 @@ float MessageFragmentStore::tick(float time_delta) {
continue; // not ours continue; // not ours
} }
{ // potentially adjust tsrange (some external processes can change timestamps)
const auto msg_ts = msg_view.get<Message::Components::Timestamp>(m).ts;
if (ftsrange.begin > msg_ts) {
ftsrange.begin = msg_ts;
} else if (ftsrange.end < msg_ts) {
ftsrange.end = msg_ts;
}
}
auto& j_entry = j.emplace_back(nlohmann::json::object()); auto& j_entry = j.emplace_back(nlohmann::json::object());
for (const auto& [type_id, storage] : reg->storage()) { for (const auto& [type_id, storage] : reg->storage()) {