From ca89e43a4085816c5cf3d49036ceda9c3a92bc52 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 13 Jul 2024 12:36:49 +0200 Subject: [PATCH] refactor extract chunk picker systems --- CMakeLists.txt | 3 + solanaceae/ngc_ft1_sha1/chunk_picker.cpp | 2 +- solanaceae/ngc_ft1_sha1/chunk_picker.hpp | 2 +- .../ngc_ft1_sha1/chunk_picker_systems.cpp | 127 ++++++++++++++++++ .../ngc_ft1_sha1/chunk_picker_systems.hpp | 22 +++ solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 123 +++-------------- 6 files changed, 169 insertions(+), 110 deletions(-) create mode 100644 solanaceae/ngc_ft1_sha1/chunk_picker_systems.cpp create mode 100644 solanaceae/ngc_ft1_sha1/chunk_picker_systems.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b4b8a0b..21ec247 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -67,6 +67,9 @@ add_library(solanaceae_sha1_ngcft1 ./solanaceae/ngc_ft1_sha1/participation.hpp ./solanaceae/ngc_ft1_sha1/participation.cpp + ./solanaceae/ngc_ft1_sha1/chunk_picker_systems.hpp + ./solanaceae/ngc_ft1_sha1/chunk_picker_systems.cpp + ./solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp ./solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp index 3c389ea..9c88b72 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp @@ -196,7 +196,7 @@ void ChunkPicker::updateParticipation( std::vector ChunkPicker::updateChunkRequests( Contact3Handle c, ObjectRegistry& objreg, - ReceivingTransfers& rt, + const ReceivingTransfers& rt, const size_t open_requests //const size_t flow_window //NGCFT1& nft diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp index 5ca9bf3..d0d026c 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp @@ -68,7 +68,7 @@ struct ChunkPicker { [[nodiscard]] std::vector updateChunkRequests( Contact3Handle c, ObjectRegistry& objreg, - ReceivingTransfers& rt, + const ReceivingTransfers& rt, const size_t open_requests //const size_t flow_window //NGCFT1& nft diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker_systems.cpp b/solanaceae/ngc_ft1_sha1/chunk_picker_systems.cpp new file mode 100644 index 0000000..44f44fc --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/chunk_picker_systems.cpp @@ -0,0 +1,127 @@ +#include "./chunk_picker_systems.hpp" + +#include + +#include "./components.hpp" +#include "./chunk_picker.hpp" +#include "./contact_components.hpp" + +#include +#include + +namespace Systems { + +void chunk_picker_updates( + Contact3Registry& cr, + ObjectRegistry& os_reg, + const entt::dense_map& peer_open_requests, + const ReceivingTransfers& receiving_transfers, + NGCFT1& nft, // TODO: remove this somehow + const float delta +) { + std::vector cp_to_remove; + + // first, update timers + cr.view().each([&cr, delta](const Contact3 cv, ChunkPickerTimer& cpt) { + cpt.timer -= delta; + if (cpt.timer <= 0.f) { + cr.emplace_or_replace(cv); + } + }); + + //std::cout << "number of chunkpickers: " << _cr.storage().size() << ", of which " << _cr.storage().size() << " need updating\n"; + + // now check for potentially missing cp + auto cput_view = cr.view(); + cput_view.each([&cr, &cp_to_remove](const Contact3 cv) { + Contact3Handle c{cr, cv}; + + //std::cout << "cput :)\n"; + + if (!c.any_of()) { + std::cout << "cput uh nuh :(\n"; + cp_to_remove.push_back(c); + return; + } + + if (!c.all_of()) { + std::cout << "creating new cp!!\n"; + c.emplace(); + c.emplace_or_replace(); + } + }); + + // now update all cp that are tagged + cr.view().each([&cr, &os_reg, &peer_open_requests, &receiving_transfers, &nft, &cp_to_remove](const Contact3 cv, ChunkPicker& cp) { + Contact3Handle c{cr, cv}; + + if (!c.all_of()) { + cp_to_remove.push_back(c); + return; + } + + //std::cout << "cpu :)\n"; + + // HACK: expensive, dont do every tick, only on events + // do verification in debug instead? + //cp.validateParticipation(c, _os.registry()); + + size_t peer_open_request = 0; + if (peer_open_requests.contains(c)) { + peer_open_request += peer_open_requests.at(c); + } + + auto new_requests = cp.updateChunkRequests( + c, + os_reg, + receiving_transfers, + peer_open_request + ); + + if (new_requests.empty()) { + // updateChunkRequests updates the unfinished + // TODO: pull out and check there? + if (cp.participating_unfinished.empty()) { + std::cout << "destroying empty useless cp\n"; + cp_to_remove.push_back(c); + } else { + // most likely will have something soon + // TODO: mark dirty on have instead? + c.get_or_emplace().timer = 10.f; + } + + return; + } + + assert(c.all_of()); + const auto [group_number, peer_number] = c.get(); + + for (const auto [r_o, r_idx] : new_requests) { + auto& cc = r_o.get(); + const auto& info = r_o.get(); + + // request chunk_idx + nft.NGC_FT1_send_request_private( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), + info.chunks.at(r_idx).data.data(), info.chunks.at(r_idx).size() + ); + std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n"; + } + + // force update every minute + // TODO: add small random bias to spread load + c.get_or_emplace().timer = 60.f; + }); + + // unmark all marked + cr.clear(); + assert(cr.storage().empty()); + + for (const auto& c : cp_to_remove) { + c.remove(); + } +} + +} // Systems + diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker_systems.hpp b/solanaceae/ngc_ft1_sha1/chunk_picker_systems.hpp new file mode 100644 index 0000000..2da7897 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/chunk_picker_systems.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include +#include +#include +#include + +#include "./receiving_transfers.hpp" + +namespace Systems { + +void chunk_picker_updates( + Contact3Registry& cr, + ObjectRegistry& os_reg, + const entt::dense_map& peer_open_requests, + const ReceivingTransfers& receiving_transfers, + NGCFT1& nft, // TODO: remove this somehow + const float delta +); + +} // Systems + diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index e032e6e..e58ed30 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -23,6 +23,7 @@ #include "./chunk_picker.hpp" #include "./participation.hpp" +#include "./chunk_picker_systems.hpp" #include "./transfer_stats_systems.hpp" #include @@ -248,7 +249,7 @@ float SHA1_NGCFT1::iterate(float delta) { _info_builder_queue.clear(); } - entt::dense_map _peer_open_requests; + entt::dense_map peer_open_requests; { // timers // sending transfers @@ -309,7 +310,7 @@ float SHA1_NGCFT1::iterate(float delta) { } } { // requested chunk timers - _os.registry().view().each([delta, &_peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) { + _os.registry().view().each([delta, &peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { it->second.timer += delta; @@ -317,7 +318,7 @@ float SHA1_NGCFT1::iterate(float delta) { if (it->second.timer >= 60.f) { it = ftchunk_requested.chunks.erase(it); } else { - _peer_open_requests[it->second.c] += 1; + peer_open_requests[it->second.c] += 1; it++; } } @@ -464,115 +465,21 @@ float SHA1_NGCFT1::iterate(float delta) { } // ran regardless of _max_concurrent_in - { // new chunk picker code - std::vector cp_to_remove; - - // first, update timers - _cr.view().each([this, delta](const Contact3 cv, ChunkPickerTimer& cpt) { - cpt.timer -= delta; - if (cpt.timer <= 0.f) { - _cr.emplace_or_replace(cv); - } - }); - - //std::cout << "number of chunkpickers: " << _cr.storage().size() << ", of which " << _cr.storage().size() << " need updating\n"; - - // now check for potentially missing cp - auto cput_view = _cr.view(); - cput_view.each([this, &cp_to_remove](const Contact3 cv) { - Contact3Handle c{_cr, cv}; - - //std::cout << "cput :)\n"; - - if (!c.any_of()) { - std::cout << "cput uh nuh :(\n"; - cp_to_remove.push_back(c); - return; - } - - if (!c.all_of()) { - std::cout << "creating new cp!!\n"; - c.emplace(); - c.emplace_or_replace(); - } - }); - - // now update all cp that are tagged - _cr.view().each([this, &_peer_open_requests, &cp_to_remove](const Contact3 cv, ChunkPicker& cp) { - Contact3Handle c{_cr, cv}; - - if (!c.all_of()) { - cp_to_remove.push_back(c); - return; - } - - //std::cout << "cpu :)\n"; - - // HACK: expensive, dont do every tick, only on events - // do verification in debug instead? - //cp.validateParticipation(c, _os.registry()); - - size_t peer_open_request = 0; - if (_peer_open_requests.contains(c)) { - peer_open_request += _peer_open_requests.at(c); - } - - auto new_requests = cp.updateChunkRequests( - c, - _os.registry(), - _receiving_transfers, - peer_open_request - ); - - if (new_requests.empty()) { - // updateChunkRequests updates the unfinished - // TODO: pull out and check there? - if (cp.participating_unfinished.empty()) { - std::cout << "destroying empty useless cp\n"; - cp_to_remove.push_back(c); - } else { - // most likely will have something soon - // TODO: mark dirty on have instead? - c.get_or_emplace().timer = 10.f; - } - - return; - } - - assert(c.all_of()); - const auto [group_number, peer_number] = c.get(); - - for (const auto [r_o, r_idx] : new_requests) { - auto& cc = r_o.get(); - const auto& info = r_o.get(); - - // request chunk_idx - _nft.NGC_FT1_send_request_private( - group_number, peer_number, - static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), - info.chunks.at(r_idx).data.data(), info.chunks.at(r_idx).size() - ); - std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n"; - } - - // force update every minute - // TODO: add small random bias to spread load - c.get_or_emplace().timer = 60.f; - }); - - // unmark all marked - _cr.clear(); - assert(_cr.storage().empty()); - - for (const auto& c : cp_to_remove) { - c.remove(); - } - } + // new chunk picker code + // TODO: need to either split up or remove some things here + Systems::chunk_picker_updates( + _cr, + _os.registry(), + peer_open_requests, + _receiving_transfers, + _nft, + delta + ); // transfer statistics systems Systems::transfer_tally_update(_os.registry(), getTimeNow()); - if (_peer_open_requests.empty()) { + if (peer_open_requests.empty()) { return 2.f; } else { // pretty conservative and should be ajusted on a per peer, per delay basis