refactor extract chunk picker systems
This commit is contained in:
parent
dd04e6131a
commit
ca89e43a40
@ -67,6 +67,9 @@ add_library(solanaceae_sha1_ngcft1
|
|||||||
./solanaceae/ngc_ft1_sha1/participation.hpp
|
./solanaceae/ngc_ft1_sha1/participation.hpp
|
||||||
./solanaceae/ngc_ft1_sha1/participation.cpp
|
./solanaceae/ngc_ft1_sha1/participation.cpp
|
||||||
|
|
||||||
|
./solanaceae/ngc_ft1_sha1/chunk_picker_systems.hpp
|
||||||
|
./solanaceae/ngc_ft1_sha1/chunk_picker_systems.cpp
|
||||||
|
|
||||||
./solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp
|
./solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp
|
||||||
./solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp
|
./solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp
|
||||||
|
|
||||||
|
@ -196,7 +196,7 @@ void ChunkPicker::updateParticipation(
|
|||||||
std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||||
Contact3Handle c,
|
Contact3Handle c,
|
||||||
ObjectRegistry& objreg,
|
ObjectRegistry& objreg,
|
||||||
ReceivingTransfers& rt,
|
const ReceivingTransfers& rt,
|
||||||
const size_t open_requests
|
const size_t open_requests
|
||||||
//const size_t flow_window
|
//const size_t flow_window
|
||||||
//NGCFT1& nft
|
//NGCFT1& nft
|
||||||
|
@ -68,7 +68,7 @@ struct ChunkPicker {
|
|||||||
[[nodiscard]] std::vector<ContentChunkR> updateChunkRequests(
|
[[nodiscard]] std::vector<ContentChunkR> updateChunkRequests(
|
||||||
Contact3Handle c,
|
Contact3Handle c,
|
||||||
ObjectRegistry& objreg,
|
ObjectRegistry& objreg,
|
||||||
ReceivingTransfers& rt,
|
const ReceivingTransfers& rt,
|
||||||
const size_t open_requests
|
const size_t open_requests
|
||||||
//const size_t flow_window
|
//const size_t flow_window
|
||||||
//NGCFT1& nft
|
//NGCFT1& nft
|
||||||
|
127
solanaceae/ngc_ft1_sha1/chunk_picker_systems.cpp
Normal file
127
solanaceae/ngc_ft1_sha1/chunk_picker_systems.cpp
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
#include "./chunk_picker_systems.hpp"
|
||||||
|
|
||||||
|
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
|
||||||
|
|
||||||
|
#include "./components.hpp"
|
||||||
|
#include "./chunk_picker.hpp"
|
||||||
|
#include "./contact_components.hpp"
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
namespace Systems {
|
||||||
|
|
||||||
|
void chunk_picker_updates(
|
||||||
|
Contact3Registry& cr,
|
||||||
|
ObjectRegistry& os_reg,
|
||||||
|
const entt::dense_map<Contact3, size_t>& peer_open_requests,
|
||||||
|
const ReceivingTransfers& receiving_transfers,
|
||||||
|
NGCFT1& nft, // TODO: remove this somehow
|
||||||
|
const float delta
|
||||||
|
) {
|
||||||
|
std::vector<Contact3Handle> cp_to_remove;
|
||||||
|
|
||||||
|
// first, update timers
|
||||||
|
cr.view<ChunkPickerTimer>().each([&cr, delta](const Contact3 cv, ChunkPickerTimer& cpt) {
|
||||||
|
cpt.timer -= delta;
|
||||||
|
if (cpt.timer <= 0.f) {
|
||||||
|
cr.emplace_or_replace<ChunkPickerUpdateTag>(cv);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
//std::cout << "number of chunkpickers: " << _cr.storage<ChunkPicker>().size() << ", of which " << _cr.storage<ChunkPickerUpdateTag>().size() << " need updating\n";
|
||||||
|
|
||||||
|
// now check for potentially missing cp
|
||||||
|
auto cput_view = cr.view<ChunkPickerUpdateTag>();
|
||||||
|
cput_view.each([&cr, &cp_to_remove](const Contact3 cv) {
|
||||||
|
Contact3Handle c{cr, cv};
|
||||||
|
|
||||||
|
//std::cout << "cput :)\n";
|
||||||
|
|
||||||
|
if (!c.any_of<Contact::Components::ToxGroupPeerEphemeral, Contact::Components::FT1Participation>()) {
|
||||||
|
std::cout << "cput uh nuh :(\n";
|
||||||
|
cp_to_remove.push_back(c);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!c.all_of<ChunkPicker>()) {
|
||||||
|
std::cout << "creating new cp!!\n";
|
||||||
|
c.emplace<ChunkPicker>();
|
||||||
|
c.emplace_or_replace<ChunkPickerTimer>();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// now update all cp that are tagged
|
||||||
|
cr.view<ChunkPicker, ChunkPickerUpdateTag>().each([&cr, &os_reg, &peer_open_requests, &receiving_transfers, &nft, &cp_to_remove](const Contact3 cv, ChunkPicker& cp) {
|
||||||
|
Contact3Handle c{cr, cv};
|
||||||
|
|
||||||
|
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral, Contact::Components::FT1Participation>()) {
|
||||||
|
cp_to_remove.push_back(c);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//std::cout << "cpu :)\n";
|
||||||
|
|
||||||
|
// HACK: expensive, dont do every tick, only on events
|
||||||
|
// do verification in debug instead?
|
||||||
|
//cp.validateParticipation(c, _os.registry());
|
||||||
|
|
||||||
|
size_t peer_open_request = 0;
|
||||||
|
if (peer_open_requests.contains(c)) {
|
||||||
|
peer_open_request += peer_open_requests.at(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto new_requests = cp.updateChunkRequests(
|
||||||
|
c,
|
||||||
|
os_reg,
|
||||||
|
receiving_transfers,
|
||||||
|
peer_open_request
|
||||||
|
);
|
||||||
|
|
||||||
|
if (new_requests.empty()) {
|
||||||
|
// updateChunkRequests updates the unfinished
|
||||||
|
// TODO: pull out and check there?
|
||||||
|
if (cp.participating_unfinished.empty()) {
|
||||||
|
std::cout << "destroying empty useless cp\n";
|
||||||
|
cp_to_remove.push_back(c);
|
||||||
|
} else {
|
||||||
|
// most likely will have something soon
|
||||||
|
// TODO: mark dirty on have instead?
|
||||||
|
c.get_or_emplace<ChunkPickerTimer>().timer = 10.f;
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(c.all_of<Contact::Components::ToxGroupPeerEphemeral>());
|
||||||
|
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
|
||||||
|
|
||||||
|
for (const auto [r_o, r_idx] : new_requests) {
|
||||||
|
auto& cc = r_o.get<Components::FT1ChunkSHA1Cache>();
|
||||||
|
const auto& info = r_o.get<Components::FT1InfoSHA1>();
|
||||||
|
|
||||||
|
// request chunk_idx
|
||||||
|
nft.NGC_FT1_send_request_private(
|
||||||
|
group_number, peer_number,
|
||||||
|
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
||||||
|
info.chunks.at(r_idx).data.data(), info.chunks.at(r_idx).size()
|
||||||
|
);
|
||||||
|
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
// force update every minute
|
||||||
|
// TODO: add small random bias to spread load
|
||||||
|
c.get_or_emplace<ChunkPickerTimer>().timer = 60.f;
|
||||||
|
});
|
||||||
|
|
||||||
|
// unmark all marked
|
||||||
|
cr.clear<ChunkPickerUpdateTag>();
|
||||||
|
assert(cr.storage<ChunkPickerUpdateTag>().empty());
|
||||||
|
|
||||||
|
for (const auto& c : cp_to_remove) {
|
||||||
|
c.remove<ChunkPicker, ChunkPickerTimer>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // Systems
|
||||||
|
|
22
solanaceae/ngc_ft1_sha1/chunk_picker_systems.hpp
Normal file
22
solanaceae/ngc_ft1_sha1/chunk_picker_systems.hpp
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <solanaceae/contact/contact_model3.hpp>
|
||||||
|
#include <solanaceae/object_store/object_store.hpp>
|
||||||
|
#include <solanaceae/tox_contacts/components.hpp>
|
||||||
|
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||||
|
|
||||||
|
#include "./receiving_transfers.hpp"
|
||||||
|
|
||||||
|
namespace Systems {
|
||||||
|
|
||||||
|
void chunk_picker_updates(
|
||||||
|
Contact3Registry& cr,
|
||||||
|
ObjectRegistry& os_reg,
|
||||||
|
const entt::dense_map<Contact3, size_t>& peer_open_requests,
|
||||||
|
const ReceivingTransfers& receiving_transfers,
|
||||||
|
NGCFT1& nft, // TODO: remove this somehow
|
||||||
|
const float delta
|
||||||
|
);
|
||||||
|
|
||||||
|
} // Systems
|
||||||
|
|
@ -23,6 +23,7 @@
|
|||||||
#include "./chunk_picker.hpp"
|
#include "./chunk_picker.hpp"
|
||||||
#include "./participation.hpp"
|
#include "./participation.hpp"
|
||||||
|
|
||||||
|
#include "./chunk_picker_systems.hpp"
|
||||||
#include "./transfer_stats_systems.hpp"
|
#include "./transfer_stats_systems.hpp"
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
@ -248,7 +249,7 @@ float SHA1_NGCFT1::iterate(float delta) {
|
|||||||
_info_builder_queue.clear();
|
_info_builder_queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
entt::dense_map<Contact3, size_t> _peer_open_requests;
|
entt::dense_map<Contact3, size_t> peer_open_requests;
|
||||||
|
|
||||||
{ // timers
|
{ // timers
|
||||||
// sending transfers
|
// sending transfers
|
||||||
@ -309,7 +310,7 @@ float SHA1_NGCFT1::iterate(float delta) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
{ // requested chunk timers
|
{ // requested chunk timers
|
||||||
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta, &_peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta, &peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||||
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
||||||
it->second.timer += delta;
|
it->second.timer += delta;
|
||||||
|
|
||||||
@ -317,7 +318,7 @@ float SHA1_NGCFT1::iterate(float delta) {
|
|||||||
if (it->second.timer >= 60.f) {
|
if (it->second.timer >= 60.f) {
|
||||||
it = ftchunk_requested.chunks.erase(it);
|
it = ftchunk_requested.chunks.erase(it);
|
||||||
} else {
|
} else {
|
||||||
_peer_open_requests[it->second.c] += 1;
|
peer_open_requests[it->second.c] += 1;
|
||||||
it++;
|
it++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -464,115 +465,21 @@ float SHA1_NGCFT1::iterate(float delta) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ran regardless of _max_concurrent_in
|
// ran regardless of _max_concurrent_in
|
||||||
{ // new chunk picker code
|
// new chunk picker code
|
||||||
std::vector<Contact3Handle> cp_to_remove;
|
// TODO: need to either split up or remove some things here
|
||||||
|
Systems::chunk_picker_updates(
|
||||||
// first, update timers
|
_cr,
|
||||||
_cr.view<ChunkPickerTimer>().each([this, delta](const Contact3 cv, ChunkPickerTimer& cpt) {
|
_os.registry(),
|
||||||
cpt.timer -= delta;
|
peer_open_requests,
|
||||||
if (cpt.timer <= 0.f) {
|
_receiving_transfers,
|
||||||
_cr.emplace_or_replace<ChunkPickerUpdateTag>(cv);
|
_nft,
|
||||||
}
|
delta
|
||||||
});
|
);
|
||||||
|
|
||||||
//std::cout << "number of chunkpickers: " << _cr.storage<ChunkPicker>().size() << ", of which " << _cr.storage<ChunkPickerUpdateTag>().size() << " need updating\n";
|
|
||||||
|
|
||||||
// now check for potentially missing cp
|
|
||||||
auto cput_view = _cr.view<ChunkPickerUpdateTag>();
|
|
||||||
cput_view.each([this, &cp_to_remove](const Contact3 cv) {
|
|
||||||
Contact3Handle c{_cr, cv};
|
|
||||||
|
|
||||||
//std::cout << "cput :)\n";
|
|
||||||
|
|
||||||
if (!c.any_of<Contact::Components::ToxGroupPeerEphemeral, Contact::Components::FT1Participation>()) {
|
|
||||||
std::cout << "cput uh nuh :(\n";
|
|
||||||
cp_to_remove.push_back(c);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!c.all_of<ChunkPicker>()) {
|
|
||||||
std::cout << "creating new cp!!\n";
|
|
||||||
c.emplace<ChunkPicker>();
|
|
||||||
c.emplace_or_replace<ChunkPickerTimer>();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// now update all cp that are tagged
|
|
||||||
_cr.view<ChunkPicker, ChunkPickerUpdateTag>().each([this, &_peer_open_requests, &cp_to_remove](const Contact3 cv, ChunkPicker& cp) {
|
|
||||||
Contact3Handle c{_cr, cv};
|
|
||||||
|
|
||||||
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral, Contact::Components::FT1Participation>()) {
|
|
||||||
cp_to_remove.push_back(c);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//std::cout << "cpu :)\n";
|
|
||||||
|
|
||||||
// HACK: expensive, dont do every tick, only on events
|
|
||||||
// do verification in debug instead?
|
|
||||||
//cp.validateParticipation(c, _os.registry());
|
|
||||||
|
|
||||||
size_t peer_open_request = 0;
|
|
||||||
if (_peer_open_requests.contains(c)) {
|
|
||||||
peer_open_request += _peer_open_requests.at(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto new_requests = cp.updateChunkRequests(
|
|
||||||
c,
|
|
||||||
_os.registry(),
|
|
||||||
_receiving_transfers,
|
|
||||||
peer_open_request
|
|
||||||
);
|
|
||||||
|
|
||||||
if (new_requests.empty()) {
|
|
||||||
// updateChunkRequests updates the unfinished
|
|
||||||
// TODO: pull out and check there?
|
|
||||||
if (cp.participating_unfinished.empty()) {
|
|
||||||
std::cout << "destroying empty useless cp\n";
|
|
||||||
cp_to_remove.push_back(c);
|
|
||||||
} else {
|
|
||||||
// most likely will have something soon
|
|
||||||
// TODO: mark dirty on have instead?
|
|
||||||
c.get_or_emplace<ChunkPickerTimer>().timer = 10.f;
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(c.all_of<Contact::Components::ToxGroupPeerEphemeral>());
|
|
||||||
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
|
|
||||||
|
|
||||||
for (const auto [r_o, r_idx] : new_requests) {
|
|
||||||
auto& cc = r_o.get<Components::FT1ChunkSHA1Cache>();
|
|
||||||
const auto& info = r_o.get<Components::FT1InfoSHA1>();
|
|
||||||
|
|
||||||
// request chunk_idx
|
|
||||||
_nft.NGC_FT1_send_request_private(
|
|
||||||
group_number, peer_number,
|
|
||||||
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
|
||||||
info.chunks.at(r_idx).data.data(), info.chunks.at(r_idx).size()
|
|
||||||
);
|
|
||||||
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n";
|
|
||||||
}
|
|
||||||
|
|
||||||
// force update every minute
|
|
||||||
// TODO: add small random bias to spread load
|
|
||||||
c.get_or_emplace<ChunkPickerTimer>().timer = 60.f;
|
|
||||||
});
|
|
||||||
|
|
||||||
// unmark all marked
|
|
||||||
_cr.clear<ChunkPickerUpdateTag>();
|
|
||||||
assert(_cr.storage<ChunkPickerUpdateTag>().empty());
|
|
||||||
|
|
||||||
for (const auto& c : cp_to_remove) {
|
|
||||||
c.remove<ChunkPicker, ChunkPickerTimer>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// transfer statistics systems
|
// transfer statistics systems
|
||||||
Systems::transfer_tally_update(_os.registry(), getTimeNow());
|
Systems::transfer_tally_update(_os.registry(), getTimeNow());
|
||||||
|
|
||||||
if (_peer_open_requests.empty()) {
|
if (peer_open_requests.empty()) {
|
||||||
return 2.f;
|
return 2.f;
|
||||||
} else {
|
} else {
|
||||||
// pretty conservative and should be ajusted on a per peer, per delay basis
|
// pretty conservative and should be ajusted on a per peer, per delay basis
|
||||||
|
Loading…
Reference in New Issue
Block a user