diff --git a/src/fragment_store/message_fragment_store.cpp b/src/fragment_store/message_fragment_store.cpp index b4b4186e..d53f4415 100644 --- a/src/fragment_store/message_fragment_store.cpp +++ b/src/fragment_store/message_fragment_store.cpp @@ -22,8 +22,6 @@ namespace Message::Components { // ctx struct OpenFragments { struct OpenFrag final { - uint64_t ts_begin {0}; - uint64_t ts_end {0}; std::vector uid; }; // only contains fragments with <1024 messages and <28h tsrage @@ -93,9 +91,15 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { std::vector fragment_uid; // first search for fragment where the ts falls into the range - for (const auto& [ts_begin, ts_end, fid] : fuid_open) { - if (ts_begin <= msg_ts && ts_end >= msg_ts) { - fragment_uid = fid; + for (const auto& fuid : fuid_open) { + auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fuid.uid)); + assert(static_cast(fh)); + + // assuming ts range exists + auto& fts_comp = fh.get(); + + if (fts_comp.begin <= msg_ts && fts_comp.end >= msg_ts) { + fragment_uid = fuid.uid; // TODO: check conditions for open here // TODO: mark msg (and frag?) dirty } @@ -103,39 +107,37 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { // if it did not fit into an existing fragment, we next look for fragments that could be extended if (fragment_uid.empty()) { - for (auto& [ts_begin, ts_end, fid] : fuid_open) { - const int64_t frag_range = int64_t(ts_end) - int64_t(ts_begin); + for (const auto& fuid : fuid_open) { + auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fuid.uid)); + assert(static_cast(fh)); + + // assuming ts range exists + auto& fts_comp = fh.get(); + + const int64_t frag_range = int64_t(fts_comp.end) - int64_t(fts_comp.begin); constexpr static int64_t max_frag_ts_extent {1000*60*60}; //constexpr static int64_t max_frag_ts_extent {1000*60*3}; // 3min for testing const int64_t possible_extention = max_frag_ts_extent - frag_range; // which direction - if ((ts_begin - possible_extention) <= msg_ts && ts_begin > msg_ts) { - fragment_uid = fid; - auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid)); - assert(static_cast(fh)); + if ((fts_comp.begin - possible_extention) <= msg_ts && fts_comp.begin > msg_ts) { + fragment_uid = fuid.uid; - std::cout << "MFS: extended begin from " << ts_begin << " to " << msg_ts << "\n"; + std::cout << "MFS: extended begin from " << fts_comp.begin << " to " << msg_ts << "\n"; // assuming ts range exists - auto& fts_comp = fh.get(); fts_comp.begin = msg_ts; // extend into the past - ts_begin = msg_ts; // TODO: check conditions for open here // TODO: mark msg (and frag?) dirty - } else if ((ts_end + possible_extention) >= msg_ts && ts_end < msg_ts) { - fragment_uid = fid; - auto fh = _fs.fragmentHandle(_fs.getFragmentByID(fid)); - assert(static_cast(fh)); + } else if ((fts_comp.end + possible_extention) >= msg_ts && fts_comp.end < msg_ts) { + fragment_uid = fuid.uid; - std::cout << "MFS: extended end from " << ts_end << " to " << msg_ts << "\n"; + std::cout << "MFS: extended end from " << fts_comp.end << " to " << msg_ts << "\n"; // assuming ts range exists - auto& fts_comp = fh.get(); fts_comp.end = msg_ts; // extend into the future - ts_end = msg_ts; // TODO: check conditions for open here // TODO: mark msg (and frag?) dirty @@ -157,7 +159,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { fh.emplace_or_replace().comp = Compression::ZSTD; fh.emplace_or_replace().comp = Compression::ZSTD; - auto& new_ts_range = fh.emplace(); + auto& new_ts_range = fh.emplace_or_replace(); new_ts_range.begin = msg_ts; new_ts_range.end = msg_ts; @@ -170,7 +172,7 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) { } } - fuid_open.emplace_back(Message::Components::OpenFragments::OpenFrag{msg_ts, msg_ts, fragment_uid}); + fuid_open.emplace_back(Message::Components::OpenFragments::OpenFrag{fragment_uid}); std::cout << "MFS: created new fragment " << bin2hex(fragment_uid) << "\n"; @@ -306,6 +308,10 @@ float MessageFragmentStore::tick(float time_delta) { const auto fid = _fs.getFragmentByID(_fuid_save_queue.front().id); auto* reg = _fuid_save_queue.front().reg; assert(reg != nullptr); + + auto fh = _fs.fragmentHandle(fid); + auto& ftsrange = fh.get_or_emplace(Message::getTimeMS(), Message::getTimeMS()); + auto j = nlohmann::json::array(); // TODO: does every message have ts? @@ -327,6 +333,15 @@ float MessageFragmentStore::tick(float time_delta) { continue; // not ours } + { // potentially adjust tsrange (some external processes can change timestamps) + const auto msg_ts = msg_view.get(m).ts; + if (ftsrange.begin > msg_ts) { + ftsrange.begin = msg_ts; + } else if (ftsrange.end < msg_ts) { + ftsrange.end = msg_ts; + } + } + auto& j_entry = j.emplace_back(nlohmann::json::object()); for (const auto& [type_id, storage] : reg->storage()) {