From 1bf1fbce7580acdac2ce834432d78863ccd62591 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Fri, 22 Nov 2024 13:51:25 +0100 Subject: [PATCH] small hs progress --- solanaceae/ngc_hs2/ngc_hs2_send.cpp | 167 ++++++++++++++++++++++++++-- solanaceae/ngc_hs2/ngc_hs2_send.hpp | 66 ++++++++++- 2 files changed, 218 insertions(+), 15 deletions(-) diff --git a/solanaceae/ngc_hs2/ngc_hs2_send.cpp b/solanaceae/ngc_hs2/ngc_hs2_send.cpp index 17a75d7..c83045e 100644 --- a/solanaceae/ngc_hs2/ngc_hs2_send.cpp +++ b/solanaceae/ngc_hs2/ngc_hs2_send.cpp @@ -6,8 +6,35 @@ #include +#include + // https://www.youtube.com/watch?v=AdAqsgga3qo +namespace Components { + +void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) { + // TODO: do more than exact dedupe + for (const auto& [ts_start, ts_end] : _queue) { + if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) { + return; // already enqueued + } + } + + _queue.push_back(new_request); +} + +void IncommingMsgRequestQueue::queueRequest(const SingleMessageRequest& new_request) { + for (const auto& [ppk, mid, ts] : _queue) { + if (mid == new_request.mid && ts == new_request.ts && ppk == new_request.ppk) { + return; // already enqueued + } + } + _queue.push_back(new_request); +} + +} // Components + + NGCHS2Send::NGCHS2Send( Contact3Registry& cr, RegistryMessageModelI& rmm, @@ -34,41 +61,152 @@ NGCHS2Send::~NGCHS2Send(void) { } float NGCHS2Send::iterate(float delta) { + // limit how often we update here (new fts usually) + if (_iterate_heat > 0.f) { + _iterate_heat -= delta; + return 1000.f; + } else { + _iterate_heat = _iterate_cooldown; + } // work request queue // check if already running, discard + auto fn_iirq = [this](auto&& view) { + for (auto&& [cv, iirq] : view.each()) { + Contact3Handle c{_cr, cv}; + auto& iirr = c.get_or_emplace(); + + // dedup queued from running + + if (iirr._list.size() >= _max_parallel_per_peer) { + continue; + } + + // new ft here? + } + }; + + auto fn_imrq = [this](auto&& view) { + for (auto&& [cv, imrq] : view.each()) { + Contact3Handle c{_cr, cv}; + auto& imrr = c.get_or_emplace(); + + // dedup queued from running + + if (imrr._list.size() >= _max_parallel_per_peer) { + continue; + } + + // new ft here? + } + }; + + // first handle range requests on weak self + //for (auto&& [cv, iirq] : _cr.view().each()) { + fn_iirq(_cr.view()); + + // then handle messages on weak self + //for (auto&& [cv, imrq] : _cr.view().each()) { + fn_imrq(_cr.view()); + + // we could stop here, if too much is already running + + // then range on others + //for (auto&& [cv, iirq] : _cr.view(entt::exclude_t{}).each()) { + fn_iirq(_cr.view(entt::exclude_t{})); + + // then messages on others + //for (auto&& [cv, imrq] : _cr.view(entt::exclude_t{}).each()) { + fn_imrq(_cr.view(entt::exclude_t{})); + return 1000.f; } +template +static uint64_t deserlSimpleType(ByteSpan bytes) { + if (bytes.size < sizeof(Type)) { + throw int(1); + } + + Type value; + + for (size_t i = 0; i < sizeof(Type); i++) { + value |= Type(bytes[i]) << (i*8); + } + + return value; +} + +static uint32_t deserlMID(ByteSpan mid_bytes) { + return deserlSimpleType(mid_bytes); +} + +static uint64_t deserlTS(ByteSpan ts_bytes) { + return deserlSimpleType(ts_bytes); +} + void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) { ByteSpan fid{e.file_id, e.file_id_size}; + // TODO: better size check + if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) { + std::cerr << "NGCHS2S error: range not lange enough\n"; + return; + } + + // seconds + uint64_t ts_start{0}; + uint64_t ts_end{0}; + // parse - // - ts start - // - ts end + try { + ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)}; + ts_start = deserlTS(ts_start_bytes); + + ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)}; + ts_end = deserlTS(ts_end_bytes); + } catch (...) { + std::cerr << "NGCHS2S error: failed to parse range\n"; + return; + } // dedupe insert into queue + // how much overlap do we allow? + c.get_or_emplace().queueRequest({ + ts_start, + ts_end, + }); } void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) { ByteSpan fid{e.file_id, e.file_id_size}; + // TODO: better size check if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) { - // error + std::cerr << "NGCHS2S error: singlemessage not lange enough\n"; return; } + ByteSpan ppk; + uint32_t mid {0}; + uint64_t ts {0}; // deciseconds + // parse - // - ppk - // TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32) - ByteSpan ppk{fid.ptr, 32}; + try { + // - ppk + // TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32) + ppk = {fid.ptr, 32}; - // - mid - //static_assert(sizeof(Tox_Group_Message_Id) == sizeof(uint32_t)); - ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)}; + // - mid + ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)}; + mid = deserlMID(mid_bytes); - // - ts - // uint64_t (seconds? we dont want milliseconds - ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)}; + // - ts + ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)}; + ts = deserlTS(ts_bytes); + } catch (...) { + std::cerr << "NGCHS2S error: failed to parse singlemessage\n"; + return; + } // file content // - message type (text/textaction/file(ft1sha1)) @@ -81,6 +219,11 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv // for queue, we need group, peer, msg_ppk, msg_mid, msg_ts // dedupe insert into queue + c.get_or_emplace().queueRequest({ + ppk, + mid, + ts, + }); } bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) { diff --git a/solanaceae/ngc_hs2/ngc_hs2_send.hpp b/solanaceae/ngc_hs2/ngc_hs2_send.hpp index c117b94..aa8b86b 100644 --- a/solanaceae/ngc_hs2/ngc_hs2_send.hpp +++ b/solanaceae/ngc_hs2/ngc_hs2_send.hpp @@ -9,10 +9,60 @@ #include +#include + +#include + // fwd class ToxContactModel2; -// limit to 2 uploads per peer simultaniously + +struct InfoRequest { + uint64_t ts_start{0}; + uint64_t ts_end{0}; +}; + +struct SingleMessageRequest { + ByteSpan ppk; + uint32_t mid {0}; + uint64_t ts {0}; // deciseconds +}; + +// TODO: move to own file +namespace Components { + struct IncommingInfoRequestQueue { + std::vector _queue; + + // we should remove/notadd queued requests + // that are subsets of same or larger ranges + void queueRequest(const InfoRequest& new_request); + }; + + struct IncommingInfoRequestRunning { + struct Entry { + InfoRequest ir; + std::vector data; // trasfer data in memory + }; + entt::dense_map _list; + }; + + struct IncommingMsgRequestQueue { + // optimize dup lookups (this list could be large) + std::vector _queue; + + // removes dups + void queueRequest(const SingleMessageRequest& new_request); + }; + + struct IncommingMsgRequestRunning { + struct Entry { + SingleMessageRequest smr; + std::vector data; // trasfer data in memory + }; + // make more efficent? this list is very short + entt::dense_map _list; + }; +} // Components class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { Contact3Registry& _cr; @@ -21,12 +71,22 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { NGCFT1& _nft; NGCFT1EventProviderI::SubscriptionReference _nftep_sr; + float _iterate_heat {0.f}; + constexpr static float _iterate_cooldown {1.22f}; // sec + // open/running info requests (by c) + // comp on peer c // open/running info responses (by c) + // comp on peer c - static const bool _only_send_self_observed {true}; - static const int64_t _max_time_into_past_default {60}; // s + // limit to 2 uploads per peer simultaniously + // TODO: increase for prod (4?) + // currently per type + constexpr static size_t _max_parallel_per_peer {2}; + + constexpr static bool _only_send_self_observed {true}; + constexpr static int64_t _max_time_into_past_default {60*15}; // s public: NGCHS2Send(