From 294c5346cadd450ee181c8436411bb6c02dd582c Mon Sep 17 00:00:00 2001 From: Green Sky Date: Fri, 6 Dec 2024 22:41:05 +0100 Subject: [PATCH] hs2 send done, but untested --- solanaceae/ngc_hs2/ngc_hs2_send.cpp | 134 ++++++++++++++++++++++++---- solanaceae/ngc_hs2/ngc_hs2_send.hpp | 14 ++- 2 files changed, 126 insertions(+), 22 deletions(-) diff --git a/solanaceae/ngc_hs2/ngc_hs2_send.cpp b/solanaceae/ngc_hs2/ngc_hs2_send.cpp index ed255f8..64ca69e 100644 --- a/solanaceae/ngc_hs2/ngc_hs2_send.cpp +++ b/solanaceae/ngc_hs2/ngc_hs2_send.cpp @@ -25,15 +25,15 @@ namespace Components { -void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request) { +void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) { // TODO: do more than exact dedupe - for (const auto& [ts_start, ts_end] : _queue) { - if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) { + for (const auto& [time_range, _] : _queue) { + if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) { return; // already enqueued } } - _queue.push_back(new_request); + _queue.emplace_back(Entry{new_request, std::vector{fid.cbegin(), fid.cend()}}); } } // Components @@ -65,7 +65,7 @@ float NGCHS2Send::iterate(float delta) { // limit how often we update here (new fts usually) if (_iterate_heat > 0.f) { _iterate_heat -= delta; - return 1000.f; + return 1000.f; // return heat? } else { _iterate_heat = _iterate_cooldown; } @@ -84,20 +84,77 @@ float NGCHS2Send::iterate(float delta) { continue; } - // new ft here? + // new ft here + // TODO: loop? nah just 1 per tick is enough + const auto request_entry = iirq._queue.front(); // copy + assert(!request_entry.fid.empty()); + + if (!c.all_of()) { + iirq._queue.pop_front(); + continue; // how + } + const Contact3Handle group_c = {*c.registry(), c.get().parent}; + if (!c.all_of()) { + iirq._queue.pop_front(); + continue; + } + const auto [group_number, peer_number] = c.get(); + + // TODO: check allowed range here + //_max_time_into_past_default + + // potentially heavy op + auto data = buildChatLogFileRange(group_c, request_entry.ir.ts_start, request_entry.ir.ts_end); + + uint8_t transfer_id {0}; + if (!_nft.NGC_FT1_send_init_private( + group_number, peer_number, + (uint32_t)NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK, + request_entry.fid.data(), request_entry.fid.size(), + data.size(), + &transfer_id, + true // can_compress (does nothing rn) + )) { + // sending failed, we do not pop but wait for next iterate + // TODO: cache data + // TODO: fail counter + // actually, fail probably means offline, so delete? + continue; + } + + assert(iirr._list.count(transfer_id) == 0); + iirr._list[transfer_id] = {request_entry.ir, data}; + + iirq._queue.pop_front(); } }; // first handle range requests on weak self - //for (auto&& [cv, iirq] : _cr.view().each()) { - fn_iirq(_cr.view()); + fn_iirq(_cr.view()); // we could stop here, if too much is already running // then range on others - //for (auto&& [cv, iirq] : _cr.view(entt::exclude_t{}).each()) { fn_iirq(_cr.view(entt::exclude_t{})); + _cr.view().each( + [delta](const auto cv, Components::IncommingTimeRangeRequestRunning& irr) { + std::vector to_remove; + for (auto&& [ft_id, entry] : irr._list) { + entry.last_activity += delta; + if (entry.last_activity >= 60.f) { + to_remove.push_back(ft_id); + } + } + for (const auto it : to_remove) { + std::cout << "NGCHS2Send warning: timed out ." << (int)it << "\n"; + // TODO: need a way to tell ft? + irr._list.erase(it); + // technically we are not supposed to timeout and instead rely on the done event + } + } + ); + return 1000.f; } @@ -150,10 +207,10 @@ void NGCHS2Send::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_req // dedupe insert into queue // how much overlap do we allow? - c.get_or_emplace().queueRequest({ - ts_start, - ts_end, - }); + c.get_or_emplace().queueRequest( + {ts_start, ts_end}, + fid + ); } #if 0 @@ -199,7 +256,7 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv } #endif -std::vector NGCHS2Send::buildHSFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) { +std::vector NGCHS2Send::buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) { const Message3Registry* reg_ptr = static_cast(_rmm).get(c); if (reg_ptr == nullptr) { return {}; @@ -358,11 +415,52 @@ bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) { return true; } -bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data&) { - return false; +bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data& e) { + auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + if (!c) { + return false; + } + + if (!c.all_of()) { + return false; + } + + auto& irr = c.get(); + if (!irr._list.count(e.transfer_id)) { + return false; // not for us (maybe) + } + + auto& transfer = irr._list.at(e.transfer_id); + if (transfer.data.size() < e.data_offset+e.data_size) { + std::cerr << "NGCHS2Send error: ft send data larger then file???\n"; + assert(false && "how"); + } + std::memcpy(e.data, transfer.data.data()+e.data_offset, e.data_size); + transfer.last_activity = 0.f; + + return true; } -bool NGCHS2Send::onEvent(const Events::NGCFT1_send_done&) { - return false; +bool NGCHS2Send::onEvent(const Events::NGCFT1_send_done& e) { + auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + if (!c) { + return false; + } + + if (!c.all_of()) { + return false; + } + + auto& irr = c.get(); + if (!irr._list.count(e.transfer_id)) { + return false; // not for us (maybe) + } + + irr._list.erase(e.transfer_id); + + // TODO: check if we completed it + std::cout << "NGCHS2Send: sent chatlog to " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n"; + + return true; } diff --git a/solanaceae/ngc_hs2/ngc_hs2_send.hpp b/solanaceae/ngc_hs2/ngc_hs2_send.hpp index 40775d3..c1a0b1a 100644 --- a/solanaceae/ngc_hs2/ngc_hs2_send.hpp +++ b/solanaceae/ngc_hs2/ngc_hs2_send.hpp @@ -12,6 +12,7 @@ #include #include +#include // fwd class ToxContactModel2; @@ -25,17 +26,22 @@ struct TimeRangeRequest { // TODO: move to own file namespace Components { struct IncommingTimeRangeRequestQueue { - std::vector _queue; + struct Entry { + TimeRangeRequest ir; + std::vector fid; + }; + std::deque _queue; // we should remove/notadd queued requests // that are subsets of same or larger ranges - void queueRequest(const TimeRangeRequest& new_request); + void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid); }; struct IncommingTimeRangeRequestRunning { struct Entry { TimeRangeRequest ir; - std::vector data; // trasfer data in memory + std::vector data; // transfer data in memory + float last_activity {0.f}; }; entt::dense_map _list; }; @@ -81,7 +87,7 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { // msg reg contact // time ranges - std::vector buildHSFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end); + [[nodiscard]] std::vector buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end); protected: bool onEvent(const Message::Events::MessageConstruct&) override;