From 33560f8f8aed61eadbc2ca223f4d749400e844d1 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sun, 30 Jun 2024 14:03:06 +0200 Subject: [PATCH] receiving transfers refactor --- CMakeLists.txt | 3 + .../ngc_ft1_sha1/receiving_transfers.cpp | 122 ++++++++++++++ .../ngc_ft1_sha1/receiving_transfers.hpp | 63 ++++++++ solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 151 +++++++----------- solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp | 23 +-- 5 files changed, 252 insertions(+), 110 deletions(-) create mode 100644 solanaceae/ngc_ft1_sha1/receiving_transfers.cpp create mode 100644 solanaceae/ngc_ft1_sha1/receiving_transfers.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1b5a949..5b2c67d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -62,6 +62,9 @@ add_library(solanaceae_sha1_ngcft1 ./solanaceae/ngc_ft1_sha1/participation.hpp ./solanaceae/ngc_ft1_sha1/participation.cpp + ./solanaceae/ngc_ft1_sha1/receiving_transfers.hpp + ./solanaceae/ngc_ft1_sha1/receiving_transfers.cpp + ./solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp ./solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp ) diff --git a/solanaceae/ngc_ft1_sha1/receiving_transfers.cpp b/solanaceae/ngc_ft1_sha1/receiving_transfers.cpp new file mode 100644 index 0000000..39edebc --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/receiving_transfers.cpp @@ -0,0 +1,122 @@ +#include "./receiving_transfers.hpp" + +#include + +void ReceivingTransfers::tick(float delta) { + for (auto peer_it = _data.begin(); peer_it != _data.end();) { + for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { + it->second.time_since_activity += delta; + + // if we have not heard for 20sec, timeout + if (it->second.time_since_activity >= 20.f) { + std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n"; + // TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading + //it->second.v + it = peer_it->second.erase(it); + } else { + it++; + } + } + + if (peer_it->second.empty()) { + // cleanup unused peers too agressive? + peer_it = _data.erase(peer_it); + } else { + peer_it++; + } + } +} + +ReceivingTransfers::Entry& ReceivingTransfers::emplaceInfo(uint64_t combined_id, uint8_t transfer_id, const Entry::Info& info) { + auto& ent = _data[combined_id][transfer_id]; + ent.v = info; + return ent; +} + +ReceivingTransfers::Entry& ReceivingTransfers::emplaceChunk(uint64_t combined_id, uint8_t transfer_id, const Entry::Chunk& chunk) { + assert(!chunk.chunk_indices.empty()); + assert(!containsPeerChunk(combined_id, chunk.content, chunk.chunk_indices.front())); + auto& ent = _data[combined_id][transfer_id]; + ent.v = chunk; + return ent; +} + +bool ReceivingTransfers::containsPeerTransfer(uint64_t combined_id, uint8_t transfer_id) const { + auto it = _data.find(combined_id); + if (it == _data.end()) { + return false; + } + + return it->second.count(transfer_id); +} + +bool ReceivingTransfers::containsChunk(ObjectHandle o, size_t chunk_idx) const { + for (const auto& [_, p] : _data) { + for (const auto& [_2, v] : p) { + if (!v.isChunk()) { + continue; + } + + const auto& c = v.getChunk(); + if (c.content != o) { + continue; + } + + for (const auto idx : c.chunk_indices) { + if (idx == chunk_idx) { + return true; + } + } + } + } + + return false; +} + +bool ReceivingTransfers::containsPeerChunk(uint64_t combined_id, ObjectHandle o, size_t chunk_idx) const { + auto it = _data.find(combined_id); + if (it == _data.end()) { + return false; + } + + for (const auto& [_, v] : it->second) { + if (!v.isChunk()) { + continue; + } + + const auto& c = v.getChunk(); + if (c.content != o) { + continue; + } + + for (const auto idx : c.chunk_indices) { + if (idx == chunk_idx) { + return true; + } + } + } + + return false; +} + +void ReceivingTransfers::removePeer(uint64_t combined_id) { + _data.erase(combined_id); +} + +void ReceivingTransfers::removePeerTransfer(uint64_t combined_id, uint8_t transfer_id) { + auto it = _data.find(combined_id); + if (it == _data.end()) { + return; + } + + it->second.erase(transfer_id); +} + +size_t ReceivingTransfers::size(void) const { + size_t count {0}; + for (const auto& [_, p] : _data) { + count += p.size(); + } + return count; +} + diff --git a/solanaceae/ngc_ft1_sha1/receiving_transfers.hpp b/solanaceae/ngc_ft1_sha1/receiving_transfers.hpp new file mode 100644 index 0000000..49dc6e6 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/receiving_transfers.hpp @@ -0,0 +1,63 @@ +#pragma once + +#include + +#include + +#include +#include +#include + +struct ReceivingTransfers { + struct Entry { + struct Info { + ObjectHandle content; + // copy of info data + // too large? + std::vector info_data; + }; + + struct Chunk { + ObjectHandle content; + std::vector chunk_indices; + // or data? + // if memmapped, this would be just a pointer + }; + + std::variant v; + + float time_since_activity {0.f}; + + bool isInfo(void) const { return std::holds_alternative(v); } + bool isChunk(void) const { return std::holds_alternative(v); } + + Info& getInfo(void) { return std::get(v); } + const Info& getInfo(void) const { return std::get(v); } + Chunk& getChunk(void) { return std::get(v); } + const Chunk& getChunk(void) const { return std::get(v); } + }; + + // key is groupid + peerid + // TODO: replace with contact + //using ReceivingTransfers = entt::dense_map>; + entt::dense_map> _data; + + void tick(float delta); + + Entry& emplaceInfo(uint64_t combined_id, uint8_t transfer_id, const Entry::Info& info); + Entry& emplaceChunk(uint64_t combined_id, uint8_t transfer_id, const Entry::Chunk& chunk); + + bool containsPeer(uint64_t combined_id) const { return _data.count(combined_id); } + bool containsPeerTransfer(uint64_t combined_id, uint8_t transfer_id) const; + bool containsChunk(ObjectHandle o, size_t chunk_idx) const; + bool containsPeerChunk(uint64_t combined_id, ObjectHandle o, size_t chunk_idx) const; + + auto& getPeer(uint64_t combined_id) { return _data.at(combined_id); } + auto& getTransfer(uint64_t combined_id, uint8_t transfer_id) { return getPeer(combined_id).at(transfer_id); } + + void removePeer(uint64_t combined_id); + void removePeerTransfer(uint64_t combined_id, uint8_t transfer_id); + + size_t size(void) const; +}; + diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index e3872f2..f8b7dd6 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -256,28 +256,7 @@ void SHA1_NGCFT1::iterate(float delta) { } // receiving transfers - for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) { - for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { - it->second.time_since_activity += delta; - - // if we have not heard for 10sec, timeout - if (it->second.time_since_activity >= 20.f) { - std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n"; - // TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading - //it->second.v - it = peer_it->second.erase(it); - } else { - it++; - } - } - - if (peer_it->second.empty()) { - // cleanup unused peers too agressive? - peer_it = _receiving_transfers.erase(peer_it); - } else { - peer_it++; - } - } + _receiving_transfers.tick(delta); // queued requests for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { @@ -330,10 +309,7 @@ void SHA1_NGCFT1::iterate(float delta) { for (const auto& [_, transfers] : _sending_transfers) { running_sending_transfer_count += transfers.size(); } - size_t running_receiving_transfer_count {0}; - for (const auto& [_, transfers] : _receiving_transfers) { - running_receiving_transfer_count += transfers.size(); - } + size_t running_receiving_transfer_count {_receiving_transfers.size()}; if (running_sending_transfer_count < _max_concurrent_out) { // TODO: for each peer? transfer cap per peer? @@ -722,10 +698,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { return false; } - _receiving_transfers - [combineIds(e.group_number, e.peer_number)] - [e.transfer_id] - .v = ReceivingTransfer::Info{ce, std::vector(e.file_size)}; + _receiving_transfers.emplaceInfo( + combineIds(e.group_number, e.peer_number), + e.transfer_id, + {ce, std::vector(e.file_size)} + ); e.accept = true; } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { @@ -765,10 +742,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { // TODO: check e.file_size assert(e.file_size == info.chunkSize(idx_vec.front())); - _receiving_transfers - [combineIds(e.group_number, e.peer_number)] - [e.transfer_id] - .v = ReceivingTransfer::Chunk{ce, idx_vec}; + _receiving_transfers.emplaceChunk( + combineIds(e.group_number, e.peer_number), + e.transfer_id, + ReceivingTransfers::Entry::Chunk{ce, idx_vec} + ); e.accept = true; @@ -781,31 +759,28 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { - if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) { + if (!_receiving_transfers.containsPeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id)) { return false; } - auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number)); - if (!peer_transfers.count(e.transfer_id)) { - return false; - } + auto& transfer = _receiving_transfers.getTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id); - auto& tv = peer_transfers[e.transfer_id].v; - peer_transfers[e.transfer_id].time_since_activity = 0.f; - if (std::holds_alternative(tv)) { - auto& info_data = std::get(tv).info_data; + transfer.time_since_activity = 0.f; + if (transfer.isInfo()) { + auto& info_data = transfer.getInfo().info_data; for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) { info_data[i+e.data_offset] = e.data[i]; } - } else if (std::holds_alternative(tv)) { - auto ce = std::get(tv).content; + } else if (transfer.isChunk()) { + auto o = transfer.getChunk().content; - assert(ce.all_of()); - auto* file = ce.get().get(); + assert(o.all_of()); + auto* file = o.get().get(); assert(file != nullptr); - for (const auto chunk_index : std::get(tv).chunk_indices) { - const auto offset_into_file = chunk_index* ce.get().chunk_size; + const auto chunk_size = o.get().chunk_size; + for (const auto chunk_index : transfer.getChunk().chunk_indices) { + const auto offset_into_file = chunk_index * chunk_size; if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) { std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n"; @@ -871,72 +846,68 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { - if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) { + if (!_receiving_transfers.containsPeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id)) { return false; } - auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number)); - if (!peer_transfers.count(e.transfer_id)) { - return false; - } + auto& transfer = _receiving_transfers.getTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id); - const auto& tv = peer_transfers[e.transfer_id].v; - if (std::holds_alternative(tv)) { - auto& info = std::get(tv); - auto ce = info.content; + if (transfer.isInfo()) { + auto& info = transfer.getInfo(); + auto o = info.content; - if (ce.any_of()) { + if (o.any_of()) { // we already have the info, discard - peer_transfers.erase(e.transfer_id); + _receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id); return true; } // check if data matches hash auto hash = hash_sha1(info.info_data.data(), info.info_data.size()); - assert(ce.all_of()); - if (ce.get().hash != hash) { + assert(o.all_of()); + if (o.get().hash != hash) { std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n"; - // requeue info request - peer_transfers.erase(e.transfer_id); + // TODO: requeue info request; eg manipulate o.get(); + _receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id); return true; } - const auto& info_data = ce.emplace_or_replace(std::move(info.info_data)).data; - auto& ft_info = ce.emplace_or_replace(); + const auto& info_data = o.emplace_or_replace(std::move(info.info_data)).data; + auto& ft_info = o.emplace_or_replace(); ft_info.fromBuffer(info_data); { // file info // TODO: not overwrite fi? since same? - auto& file_info = ce.emplace_or_replace(); + auto& file_info = o.emplace_or_replace(); file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; file_info.total_size = ft_info.file_size; } std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n"; - ce.remove(); - if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) { + o.remove(); + if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) { _queue_content_want_info.erase(it); } - ce.emplace_or_replace(); + o.emplace_or_replace(); - updateMessages(ce); - } else if (std::holds_alternative(tv)) { - auto ce = std::get(tv).content; - const auto& info = ce.get(); - auto& cc = ce.get(); + updateMessages(o); + } else if (transfer.isChunk()) { + auto o = transfer.getChunk().content; + const auto& info = o.get(); + auto& cc = o.get(); // HACK: only check first chunk (they *should* all be the same) - const auto chunk_index = std::get(tv).chunk_indices.front(); + const auto chunk_index = transfer.getChunk().chunk_indices.front(); const uint64_t offset_into_file = chunk_index * uint64_t(info.chunk_size); assert(chunk_index < info.chunks.size()); const auto chunk_size = info.chunkSize(chunk_index); assert(offset_into_file+chunk_size <= info.file_size); - const auto chunk_data = ce.get()->read(chunk_size, offset_into_file); + const auto chunk_data = o.get()->read(chunk_size, offset_into_file); assert(!chunk_data.empty()); // check hash of chunk @@ -945,7 +916,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; if (!cc.have_all) { - for (const auto inner_chunk_index : std::get(tv).chunk_indices) { + for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) { if (!cc.have_all && !cc.have_chunk[inner_chunk_index]) { cc.have_chunk.set(inner_chunk_index); cc.have_count += 1; @@ -962,33 +933,33 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { // HACK: remap file, to clear ram // TODO: error checking - ce.get() = std::make_unique( - ce.get().file_list.front(), + o.get() = std::make_unique( + o.get().file_list.front(), info.file_size ); } // good chunk // TODO: have wasted + metadata - ce.get_or_emplace().total += chunk_data.size; + o.get_or_emplace().total += chunk_data.size; } } // queue chunk have for all participants // HACK: send immediatly to all participants - for (const auto c_part : ce.get().participants) { + for (const auto c_part : o.get().participants) { if (!_cr.all_of(c_part)) { continue; } const auto [part_group_number, part_peer_number] = _cr.get(c_part); - const auto& info_hash = ce.get().hash; + const auto& info_hash = o.get().hash; // convert size_t to uint32_t const std::vector chunk_indices { - std::get(tv).chunk_indices.cbegin(), - std::get(tv).chunk_indices.cend() + transfer.getChunk().chunk_indices.cbegin(), + transfer.getChunk().chunk_indices.cend() }; _neep.send_ft1_have( @@ -1019,14 +990,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { // remove from requested // TODO: remove at init and track running transfers differently - for (const auto it : std::get(tv).chunk_indices) { - ce.get_or_emplace().chunks.erase(it); + for (const auto it : transfer.getChunk().chunk_indices) { + o.get_or_emplace().chunks.erase(it); } - updateMessages(ce); // mostly for received bytes + updateMessages(o); // mostly for received bytes } - peer_transfers.erase(e.transfer_id); + _receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id); return true; } @@ -1468,6 +1439,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { } } + // TODO: nfcft1 should have fired receive/send done events for all them running transfers + return false; } diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index 8fa1038..993df4b 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -10,6 +10,7 @@ #include #include "./ft1_sha1_info.hpp" +#include "./receiving_transfers.hpp" #include #include @@ -68,27 +69,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public // key is groupid + peerid entt::dense_map> _sending_transfers; - struct ReceivingTransfer { - struct Info { - ObjectHandle content; - // copy of info data - // too large? - std::vector info_data; - }; - - struct Chunk { - ObjectHandle content; - std::vector chunk_indices; - // or data? - // if memmapped, this would be just a pointer - }; - - std::variant v; - - float time_since_activity {0.f}; - }; - // key is groupid + peerid - entt::dense_map> _receiving_transfers; + ReceivingTransfers _receiving_transfers; // makes request rotate around open content std::deque _queue_content_want_info;