From bc5599a2308e993647269b7ec7cad9740f319b2d Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 13 Jul 2024 13:52:43 +0200 Subject: [PATCH] refactor sending transfers the same way as receiving --- CMakeLists.txt | 3 + solanaceae/ngc_ft1_sha1/sending_transfers.cpp | 126 +++++++++++++++ solanaceae/ngc_ft1_sha1/sending_transfers.hpp | 67 ++++++++ solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 149 +++++------------- solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp | 26 +-- 5 files changed, 241 insertions(+), 130 deletions(-) create mode 100644 solanaceae/ngc_ft1_sha1/sending_transfers.cpp create mode 100644 solanaceae/ngc_ft1_sha1/sending_transfers.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 21ec247..e7d9a01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -73,6 +73,9 @@ add_library(solanaceae_sha1_ngcft1 ./solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp ./solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp + ./solanaceae/ngc_ft1_sha1/sending_transfers.hpp + ./solanaceae/ngc_ft1_sha1/sending_transfers.cpp + ./solanaceae/ngc_ft1_sha1/receiving_transfers.hpp ./solanaceae/ngc_ft1_sha1/receiving_transfers.cpp diff --git a/solanaceae/ngc_ft1_sha1/sending_transfers.cpp b/solanaceae/ngc_ft1_sha1/sending_transfers.cpp new file mode 100644 index 0000000..beda79f --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/sending_transfers.cpp @@ -0,0 +1,126 @@ +#include "./sending_transfers.hpp" + +#include +#include + +void SendingTransfers::tick(float delta) { + for (auto peer_it = _data.begin(); peer_it != _data.end();) { + for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { + it->second.time_since_activity += delta; + + // if we have not heard for 2min, timeout (lower level event on real timeout) + // TODO: do we really need this if we get events? + if (it->second.time_since_activity >= 120.f) { + std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n"; + assert(false); + it = peer_it->second.erase(it); + } else { + it++; + } + } + + if (peer_it->second.empty()) { + // cleanup unused peers too agressive? + peer_it = _data.erase(peer_it); + } else { + peer_it++; + } + } +} + +SendingTransfers::Entry& SendingTransfers::emplaceInfo(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Info& info) { + auto& ent = _data[combine_ids(group_number, peer_number)][transfer_id]; + ent.v = info; + return ent; +} + +SendingTransfers::Entry& SendingTransfers::emplaceChunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Chunk& chunk) { + assert(!containsPeerChunk(group_number, peer_number, chunk.content, chunk.chunk_index)); + auto& ent = _data[combine_ids(group_number, peer_number)][transfer_id]; + ent.v = chunk; + return ent; +} + +bool SendingTransfers::containsPeerTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) const { + auto it = _data.find(combine_ids(group_number, peer_number)); + if (it == _data.end()) { + return false; + } + + return it->second.count(transfer_id); +} + +bool SendingTransfers::containsChunk(ObjectHandle o, size_t chunk_idx) const { + for (const auto& [_, p] : _data) { + for (const auto& [_2, v] : p) { + if (!v.isChunk()) { + continue; + } + + const auto& c = v.getChunk(); + if (c.content != o) { + continue; + } + + if (c.chunk_index == chunk_idx) { + return true; + } + } + } + + return false; +} + +bool SendingTransfers::containsPeerChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle o, size_t chunk_idx) const { + auto it = _data.find(combine_ids(group_number, peer_number)); + if (it == _data.end()) { + return false; + } + + for (const auto& [_, v] : it->second) { + if (!v.isChunk()) { + continue; + } + + const auto& c = v.getChunk(); + if (c.content != o) { + continue; + } + + if (c.chunk_index == chunk_idx) { + return true; + } + } + + return false; +} + +void SendingTransfers::removePeer(uint32_t group_number, uint32_t peer_number) { + _data.erase(combine_ids(group_number, peer_number)); +} + +void SendingTransfers::removePeerTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) { + auto it = _data.find(combine_ids(group_number, peer_number)); + if (it == _data.end()) { + return; + } + + it->second.erase(transfer_id); +} + +size_t SendingTransfers::size(void) const { + size_t count {0}; + for (const auto& [_, p] : _data) { + count += p.size(); + } + return count; +} + +size_t SendingTransfers::sizePeer(uint32_t group_number, uint32_t peer_number) const { + auto it = _data.find(combine_ids(group_number, peer_number)); + if (it == _data.end()) { + return 0; + } + + return it->second.size(); +} diff --git a/solanaceae/ngc_ft1_sha1/sending_transfers.hpp b/solanaceae/ngc_ft1_sha1/sending_transfers.hpp new file mode 100644 index 0000000..bcf8645 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/sending_transfers.hpp @@ -0,0 +1,67 @@ +#pragma once + +#include + +#include + +#include "./util.hpp" + +#include +#include +#include + +struct SendingTransfers { + struct Entry { + struct Info { + // copy of info data + // too large? + std::vector info_data; + }; + + struct Chunk { + ObjectHandle content; + size_t chunk_index; // <.< remove offset_into_file + //uint64_t offset_into_file; + // or data? + // if memmapped, this would be just a pointer + }; + + std::variant v; + + float time_since_activity {0.f}; + + bool isInfo(void) const { return std::holds_alternative(v); } + bool isChunk(void) const { return std::holds_alternative(v); } + + Info& getInfo(void) { return std::get(v); } + const Info& getInfo(void) const { return std::get(v); } + Chunk& getChunk(void) { return std::get(v); } + const Chunk& getChunk(void) const { return std::get(v); } + + }; + + // key is groupid + peerid + // TODO: replace with contact + entt::dense_map> _data; + + void tick(float delta); + + Entry& emplaceInfo(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Info& info); + Entry& emplaceChunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Chunk& chunk); + + bool containsPeer(uint32_t group_number, uint32_t peer_number) const { return _data.count(combine_ids(group_number, peer_number)); } + bool containsPeerTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) const; + // less reliable, since we dont keep the list of chunk idecies + bool containsChunk(ObjectHandle o, size_t chunk_idx) const; + bool containsPeerChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle o, size_t chunk_idx) const; + + auto& getPeer(uint32_t group_number, uint32_t peer_number) { return _data.at(combine_ids(group_number, peer_number)); } + auto& getTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) { return getPeer(group_number, peer_number).at(transfer_id); } + + void removePeer(uint32_t group_number, uint32_t peer_number); + void removePeerTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id); + + size_t size(void) const; + size_t sizePeer(uint32_t group_number, uint32_t peer_number) const; +}; + diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index e58ed30..ab146f1 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -48,7 +48,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { } } -void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) { +void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle obj, const SHA1Digest& hash) { for (auto& [i_g, i_p, i_o, i_h, i_t] : _queue_requested_chunk) { // if already in queue if (i_g == group_number && i_p == peer_number && i_h == hash) { @@ -59,33 +59,17 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe } // check for running transfer - if (_sending_transfers.count(combine_ids(group_number, peer_number))) { - for (const auto& [_, transfer] : _sending_transfers.at(combine_ids(group_number, peer_number))) { - if (std::holds_alternative(transfer.v)) { - // ignore info - continue; - } - - const auto& t_c = std::get(transfer.v); - - if (content != t_c.content) { - // ignore different content - continue; - } - - auto chunk_idx_vec = content.get().chunkIndices(hash); - - for (size_t idx : chunk_idx_vec) { - if (idx == t_c.chunk_index) { - // already sending - return; // skip - } - } + auto chunk_idx_vec = obj.get().chunkIndices(hash); + // list is 1 entry in 99% of cases + for (const size_t chunk_idx : chunk_idx_vec) { + if (_sending_transfers.containsPeerChunk(group_number, peer_number, obj, chunk_idx)) { + // already sending + return; // skip } } // not in queue yet - _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f)); + _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, obj, hash, 0.f)); } void SHA1_NGCFT1::updateMessages(ObjectHandle ce) { @@ -253,28 +237,7 @@ float SHA1_NGCFT1::iterate(float delta) { { // timers // sending transfers - for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) { - for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { - it->second.time_since_activity += delta; - - // if we have not heard for 2min, timeout (lower level event on real timeout) - // TODO: do we really need this if we get events? - if (it->second.time_since_activity >= 120.f) { - std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n"; - assert(false); - it = peer_it->second.erase(it); - } else { - it++; - } - } - - if (peer_it->second.empty()) { - // cleanup unused peers too agressive? - peer_it = _sending_transfers.erase(peer_it); - } else { - peer_it++; - } - } + _sending_transfers.tick(delta); // receiving transfers _receiving_transfers.tick(delta); @@ -378,10 +341,7 @@ float SHA1_NGCFT1::iterate(float delta) { // if we have not reached the total cap for transfers // count running transfers - size_t running_sending_transfer_count {0}; - for (const auto& [_, transfers] : _sending_transfers) { - running_sending_transfer_count += transfers.size(); - } + size_t running_sending_transfer_count {_sending_transfers.size()}; size_t running_receiving_transfer_count {_receiving_transfers.size()}; if (running_sending_transfer_count < _max_concurrent_out) { @@ -394,21 +354,7 @@ float SHA1_NGCFT1::iterate(float delta) { if (!chunk_idx_vec.empty()) { // check if already sending - bool already_sending_to_this_peer = false; - if (_sending_transfers.count(combine_ids(group_number, peer_number))) { - for (const auto& [_2, t] : _sending_transfers.at(combine_ids(group_number, peer_number))) { - if (std::holds_alternative(t.v)) { - const auto& v = std::get(t.v); - if (v.content == ce && v.chunk_index == chunk_idx_vec.front()) { - // already sending - already_sending_to_this_peer = true; - break; - } - } - } - } - - if (!already_sending_to_this_peer) { + if (!_sending_transfers.containsPeerChunk(group_number, peer_number, ce, chunk_idx_vec.front())) { const auto& info = ce.get(); uint8_t transfer_id {0}; @@ -419,10 +365,14 @@ float SHA1_NGCFT1::iterate(float delta) { chunkSize(info, chunk_idx_vec.front()), &transfer_id )) { - _sending_transfers - [combine_ids(group_number, peer_number)] - [transfer_id] // TODO: also save index? - .v = SendingTransfer::Chunk{ce, chunk_idx_vec.front()}; + _sending_transfers.emplaceChunk( + group_number, peer_number, + transfer_id, + SendingTransfers::Entry::Chunk{ + ce, + chunk_idx_vec.front() + } + ); } } // else just remove from queue } @@ -649,9 +599,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { return false; } - auto content = _info_to_content.at(info_hash); + auto o = _info_to_content.at(info_hash); - if (!content.all_of()) { + if (!o.all_of()) { // we dont have the info for that infohash (yet?) return false; } @@ -663,14 +613,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { e.group_number, e.peer_number, static_cast(e.file_kind), e.file_id, e.file_id_size, - content.get().data.size(), + o.get().data.size(), &transfer_id ); - _sending_transfers - [combine_ids(e.group_number, e.peer_number)] - [transfer_id] - .v = SendingTransfer::Info{content.get().data}; + _sending_transfers.emplaceInfo( + e.group_number, e.peer_number, + transfer_id, + SendingTransfers::Entry::Info{ + o.get().data + } + ); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround @@ -872,30 +825,20 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { - if (!_sending_transfers.count(combine_ids(e.group_number, e.peer_number))) { + if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { return false; } - auto& peer = _sending_transfers.at(combine_ids(e.group_number, e.peer_number)); - - if (!peer.count(e.transfer_id)) { - return false; - } - - auto& transfer = peer.at(e.transfer_id); + auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); transfer.time_since_activity = 0.f; - if (std::holds_alternative(transfer.v)) { - auto& info_transfer = std::get(transfer.v); + + if (transfer.isInfo()) { + auto& info_transfer = transfer.getInfo(); for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) { e.data[i] = info_transfer.info_data[i + e.data_offset]; } - - if (e.data_offset + e.data_size >= info_transfer.info_data.size()) { - // was last read (probably TODO: add transfer destruction event) - peer.erase(e.transfer_id); - } - } else if (std::holds_alternative(transfer.v)) { - auto& chunk_transfer = std::get(transfer.v); + } else if (transfer.isChunk()) { + auto& chunk_transfer = transfer.getChunk(); const auto& info = chunk_transfer.content.get(); // TODO: should we really use file? const auto data = chunk_transfer.content.get()->read( @@ -912,11 +855,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { // TODO: add event to propergate to messages //_rmm.throwEventUpdate(transfer); // should we? - //if (e.data_offset + e.data_size >= *insert chunk size here*) { - //// was last read (probably TODO: add transfer destruction event) - //peer.erase(e.transfer_id); - //} - auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); if (static_cast(c)) { chunk_transfer.content.get_or_emplace() @@ -1102,20 +1040,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) { - if (!_sending_transfers.count(combine_ids(e.group_number, e.peer_number))) { + if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { return false; } - auto& peer_transfers = _sending_transfers.at(combine_ids(e.group_number, e.peer_number)); - if (!peer_transfers.count(e.transfer_id)) { - return false; + auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); + + if (transfer.isChunk()) { + updateMessages(transfer.getChunk().content); // mostly for sent bytes } - const auto& tv = peer_transfers[e.transfer_id].v; - if (std::holds_alternative(tv)) { - updateMessages(std::get(tv).content); // mostly for sent bytes - } - peer_transfers.erase(e.transfer_id); + _sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index 0878135..4876bf9 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -11,6 +11,7 @@ #include #include "./ft1_sha1_info.hpp" +#include "./sending_transfers.hpp" #include "./receiving_transfers.hpp" #include @@ -54,28 +55,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public //void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash); void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash); - struct SendingTransfer { - struct Info { - // copy of info data - // too large? - std::vector info_data; - }; - - struct Chunk { - ObjectHandle content; - size_t chunk_index; // <.< remove offset_into_file - //uint64_t offset_into_file; - // or data? - // if memmapped, this would be just a pointer - }; - - std::variant v; - - float time_since_activity {0.f}; - }; - // key is groupid + peerid - entt::dense_map> _sending_transfers; - + SendingTransfers _sending_transfers; ReceivingTransfers _receiving_transfers; // makes request rotate around open content @@ -87,7 +67,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public }; std::deque _queue_send_bitset; - // workaround missing contact events + // FIXME: workaround missing contact events // only used to remove participation on peer exit entt::dense_map _tox_peer_to_contact;