new chunk picker, basically working

still needs work on the sending side and more bug fixes
This commit is contained in:
Green Sky 2024-07-03 12:11:20 +02:00
parent 613b183592
commit d19fc6ba30
No known key found for this signature in database
4 changed files with 210 additions and 108 deletions

View File

@ -1,124 +1,169 @@
#include "./chunk_picker.hpp" #include "./chunk_picker.hpp"
#include <solanaceae/contact/contact_model3.hpp> #include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/object_store/object_store.hpp>
#include "./components.hpp" #include "./components.hpp"
#include <entt/container/dense_map.hpp>
#include <entt/container/dense_set.hpp>
#include <cstddef>
#include <cstdint>
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
void ChunkPicker::updateParticipation(
Contact3Handle c,
ObjectRegistry& objreg
) {
// replaces them in place
participating.clear();
participating_unfinished.clear();
for (const Object ov : objreg.view<Components::SuspectedParticipants>()) {
const ObjectHandle o {objreg, ov};
participating.emplace(o);
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
continue;
}
if (!o.get<Components::FT1ChunkSHA1Cache>().have_all) {
participating_unfinished.emplace(o, ParticipationEntry{});
}
}
}
std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests( std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
Contact3Handle c, Contact3Handle c,
ObjectRegistry& objreg, ObjectRegistry& objreg,
ReceivingTransfers& rt ReceivingTransfers& rt
//NGCFT1& nft //NGCFT1& nft
) { ) {
if (!static_cast<bool>(c)) {
assert(false); return {};
}
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
assert(false); return {};
}
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
std::vector<ContentChunkR> req_ret; std::vector<ContentChunkR> req_ret;
// count running tf and open requests // count running tf and open requests
// TODO: implement const size_t num_ongoing_transfers = rt.sizePeer(group_number, peer_number);
const size_t num_requests = max_tf_chunk_requests; // TODO: account for open requests
// TODO: base max on rate(chunks per sec), gonna be ass with variable chunk size
const size_t num_requests = std::max<int64_t>(0, max_tf_chunk_requests-num_ongoing_transfers);
// while n < X // while n < X
while (false && !participating_unfinished.empty()) {
// round robin content (remember last obj) if (participating_unfinished.empty()) {
if (!objreg.valid(participating_in_last)) { participating_in_last = entt::null;
participating_in_last = participating_unfinished.begin()->first; return {};
//participating_in_last = *participating_unfinished.begin(); }
// round robin content (remember last obj)
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
participating_in_last = participating_unfinished.begin()->first;
//participating_in_last = *participating_unfinished.begin();
}
assert(objreg.valid(participating_in_last));
auto it = participating_unfinished.find(participating_in_last);
// hard limit robin rounds to array size time 100
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*100; i++) {
if (it == participating_unfinished.end()) {
it = participating_unfinished.begin();
} }
assert(objreg.valid(participating_in_last));
auto it = participating_unfinished.find(participating_in_last); if (it->second.skips < it->second.should_skip) {
// hard limit robin rounds to array size time 100 it->second.skips++;
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*100; i++) { continue;
if (it == participating_unfinished.end()) { }
it = participating_unfinished.begin();
}
if (it->second.skips < it->second.should_skip) { ObjectHandle o {objreg, it->first};
it->second.skips++;
// intersect self have with other have
if (!o.all_of<Components::RemoteHave, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
// rare case where no one other has anything
continue;
}
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
if (cc.have_all) {
std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n";
continue;
}
const auto& others_have = o.get<Components::RemoteHave>().others;
auto other_it = others_have.find(c);
if (other_it == others_have.end()) {
// rare case where the other is participating but has nothing
continue;
}
const auto& other_have = other_it->second;
BitSet chunk_candidates = cc.have_chunk;
if (!other_have.have_all) {
// AND is the same as ~(~A | ~B)
// that means we leave chunk_candidates as (have is inverted want)
// merge is or
// invert at the end
chunk_candidates
.merge(other_have.have.invert())
.invert();
// TODO: add intersect for more perf
} else {
chunk_candidates.invert();
}
const auto total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
auto& requested_chunks = o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
// TODO: trim off round up to 8, since they are now always set
// now select (globaly) unrequested other have
// TODO: pick strategies
// TODO: how do we prioratize within a file?
// - first (walk from start (or readhead?))
// - random (choose random start pos and walk)
// - rarest (keep track of rarity and sort by that)
// - steaming (use read head to determain time critical chunks, potentially over requesting, first (relative to stream head) otherwise
// maybe look into libtorrens deadline stuff
// - arbitrary priority maps/functions (and combine with above in rations)
// simple, we use first
for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) {
if (!chunk_candidates[i]) {
continue; continue;
} }
ObjectHandle o {objreg, it->first}; // i is a candidate we can request form peer
// intersect self have with other have // first check against double requests
if (!o.all_of<Components::RemoteHave, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) { if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool {
// rare case where no one other has anything return false;
}) != req_ret.cend()) {
// already in return array
// how did we get here? should we fast exit? if simple-first strat, we would want to
continue; // skip
}
// second check against global requests (this might differ based on strat)
if (requested_chunks.count(i) != 0) {
continue; continue;
} }
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>(); // third we check against globally running transfers (this might differ based on strat)
if (cc.have_all) { if (rt.containsChunk(o, i)) {
std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n";
continue; continue;
} }
const auto& others_have = o.get<Components::RemoteHave>().others; // if nothing else blocks this, add to ret
auto other_it = others_have.find(c); req_ret.push_back(ContentChunkR{o, i});
if (other_it == others_have.end()) {
// rare case where the other is participating but has nothing
continue;
}
const auto& other_have = other_it->second; assert(requested_chunks.count(i) == 0);
requested_chunks[i] = 0.f;
BitSet chunk_candidates = cc.have_chunk;
if (!other_have.have_all) {
// AND is the same as ~(~A | ~B)
// that means we leave chunk_candidates as (have is inverted want)
// merge is or
// invert at the end
chunk_candidates
.merge(other_have.have.invert())
.invert();
// TODO: add intersect for more perf
} else {
chunk_candidates.invert();
}
const auto total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
// TODO: trim off round up to 8, since they are now always set
// now select (globaly) unrequested other have
// TODO: pick strategies
// TODO: how do we prioratize within a file?
// - first (walk from start (or readhead?))
// - random (choose random start pos and walk)
// - rarest (keep track of rarity and sort by that)
// - steaming (use read head to determain time critical chunks, potentially over requesting, first (relative to stream head) otherwise
// maybe look into libtorrens deadline stuff
// - arbitrary priority maps/functions (and combine with above in rations)
// simple, we use first
for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) {
if (!chunk_candidates[i]) {
continue;
}
// i is a candidate we can request form peer
// first check against double requests
if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool {
return false;
}) != req_ret.cend()) {
// already in return array
// how did we get here? should we fast exit? if simple-first strat, we would want to
continue; // skip
}
// TODO: also check against globally running transfers!!!
// if nothing else blocks this, add to ret
req_ret.push_back(ContentChunkR{o, i});
}
} }
} }

View File

@ -41,6 +41,11 @@ struct ChunkPicker {
entt::dense_set<Object> participating; entt::dense_set<Object> participating;
Object participating_in_last {entt::null}; Object participating_in_last {entt::null};
void updateParticipation(
Contact3Handle c,
ObjectRegistry& objreg
);
// tick // tick
//void sendInfoRequests(); //void sendInfoRequests();
// is this like a system? // is this like a system?
@ -54,7 +59,7 @@ struct ChunkPicker {
size_t chunk_index; size_t chunk_index;
}; };
// returns list of chunks to request // returns list of chunks to request
std::vector<ContentChunkR> updateChunkRequests( [[nodiscard]] std::vector<ContentChunkR> updateChunkRequests(
Contact3Handle c, Contact3Handle c,
ObjectRegistry& objreg, ObjectRegistry& objreg,
ReceivingTransfers& rt ReceivingTransfers& rt

View File

@ -45,7 +45,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
} }
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) { void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) {
for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) { for (auto& [i_g, i_p, i_o, i_h, i_t] : _queue_requested_chunk) {
// if already in queue // if already in queue
if (i_g == group_number && i_p == peer_number && i_h == hash) { if (i_g == group_number && i_p == peer_number && i_h == hash) {
// update timer // update timer
@ -261,6 +261,7 @@ void SHA1_NGCFT1::iterate(float delta) {
float& timer = std::get<float>(*it); float& timer = std::get<float>(*it);
timer += delta; timer += delta;
// forget after 10sec
if (timer >= 10.f) { if (timer >= 10.f) {
it = _queue_requested_chunk.erase(it); it = _queue_requested_chunk.erase(it);
} else { } else {
@ -290,8 +291,8 @@ void SHA1_NGCFT1::iterate(float delta) {
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
it->second += delta; it->second += delta;
// 20sec, TODO: config // 15sec, TODO: config
if (it->second >= 20.f) { if (it->second >= 15.f) {
it = ftchunk_requested.chunks.erase(it); it = ftchunk_requested.chunks.erase(it);
} else { } else {
it++; it++;
@ -386,6 +387,7 @@ void SHA1_NGCFT1::iterate(float delta) {
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
} }
#if 0
} else if (!_queue_content_want_chunk.empty()) { } else if (!_queue_content_want_chunk.empty()) {
const auto ce = _queue_content_want_chunk.front(); const auto ce = _queue_content_want_chunk.front();
@ -450,7 +452,47 @@ void SHA1_NGCFT1::iterate(float delta) {
} }
} }
} }
#endif
} }
// new chunk picker code
_cr.view<ChunkPicker>().each([this](const Contact3 cv, ChunkPicker& cp) {
Contact3Handle c{_cr, cv};
// HACK: expensive, dont do every tick, only on events
// do verification in debug instead?
cp.updateParticipation(
c,
_os.registry()
);
assert(!cp.participating.empty());
auto new_requests = cp.updateChunkRequests(
c,
_os.registry(),
_receiving_transfers
);
if (new_requests.empty()) {
return;
}
assert(c.all_of<Contact::Components::ToxGroupPeerEphemeral>());
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
for (const auto [r_o, r_idx] : new_requests) {
auto& cc = r_o.get<Components::FT1ChunkSHA1Cache>();
const auto& info = r_o.get<Components::FT1InfoSHA1>();
// request chunk_idx
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
info.chunks.at(r_idx).data.data(), info.chunks.at(r_idx).size()
);
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n";
}
});
} }
} }
@ -645,22 +687,22 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
return false; return false;
} }
auto ce = _chunks.at(chunk_hash); auto o = _chunks.at(chunk_hash);
{ // they advertise interest in the content { // they advertise interest in the content
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
addParticipation(c, ce); addParticipation(c, o);
} }
assert(ce.all_of<Components::FT1ChunkSHA1Cache>()); assert(o.all_of<Components::FT1ChunkSHA1Cache>());
if (!ce.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) { if (!o.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
// we dont have the chunk // we dont have the chunk
return false; return false;
} }
// queue good request // queue good request
queueUpRequestChunk(e.group_number, e.peer_number, ce, chunk_hash); queueUpRequestChunk(e.group_number, e.peer_number, o, chunk_hash);
} else { } else {
assert(false && "unhandled case"); assert(false && "unhandled case");
} }
@ -711,19 +753,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
return false; return false;
} }
auto ce = _chunks.at(sha1_chunk_hash); auto o = _chunks.at(sha1_chunk_hash);
// CHECK IF TRANSFER IN PROGESS!!
{ // they have the content (probably, might be fake, should move this to done) { // they have the content (probably, might be fake, should move this to done)
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
addParticipation(c, ce); addParticipation(c, o);
} }
assert(ce.all_of<Components::FT1InfoSHA1>()); assert(o.all_of<Components::FT1InfoSHA1>());
assert(ce.all_of<Components::FT1ChunkSHA1Cache>()); assert(o.all_of<Components::FT1ChunkSHA1Cache>());
const auto& cc = ce.get<Components::FT1ChunkSHA1Cache>(); const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
if (cc.haveChunk(sha1_chunk_hash)) { if (cc.haveChunk(sha1_chunk_hash)) {
std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n"; std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n";
// we have the chunk // we have the chunk
@ -735,7 +775,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
auto idx_vec = cc.chunkIndices(sha1_chunk_hash); auto idx_vec = cc.chunkIndices(sha1_chunk_hash);
assert(!idx_vec.empty()); assert(!idx_vec.empty());
const auto& info = ce.get<Components::FT1InfoSHA1>(); // CHECK IF TRANSFER IN PROGESS!!
for (const auto idx : idx_vec) {
if (_receiving_transfers.containsPeerChunk(e.group_number, e.peer_number, o, idx)) {
std::cerr << "SHA1_NGCFT1 error: " << e.group_number << ":" << e.peer_number << " offered chunk(" << idx << ") it is already receiving!!\n";
return false;
}
}
const auto& info = o.get<Components::FT1InfoSHA1>();
// TODO: check e.file_size // TODO: check e.file_size
assert(e.file_size == info.chunkSize(idx_vec.front())); assert(e.file_size == info.chunkSize(idx_vec.front()));
@ -743,7 +791,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
_receiving_transfers.emplaceChunk( _receiving_transfers.emplaceChunk(
e.group_number, e.peer_number, e.group_number, e.peer_number,
e.transfer_id, e.transfer_id,
ReceivingTransfers::Entry::Chunk{ce, idx_vec} ReceivingTransfers::Entry::Chunk{o, idx_vec}
); );
e.accept = true; e.accept = true;
@ -1108,6 +1156,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
// HACK: assume the message sender is participating. usually a safe bet. // HACK: assume the message sender is participating. usually a safe bet.
addParticipation(c, ce); addParticipation(c, ce);
// HACK: assume the message sender has all
ce.get_or_emplace<Components::RemoteHave>().others[c] = {true, {}};
if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) { if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) {
// TODO: check if already receiving // TODO: check if already receiving
_queue_content_want_info.push_back(ce); _queue_content_want_info.push_back(ce);
@ -1419,6 +1470,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
return false; return false;
} }
c.remove<ChunkPicker>();
for (const auto& [_, o] : _info_to_content) { for (const auto& [_, o] : _info_to_content) {
removeParticipation(c, o); removeParticipation(c, o);
@ -1484,6 +1537,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
for (const auto c_i : e.chunks) { for (const auto c_i : e.chunks) {
if (c_i >= num_total_chunks) { if (c_i >= num_total_chunks) {
std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n"; std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n";
std::cerr << info_hash << ": " << c_i << " >= " << num_total_chunks << "\n";
continue; continue;
} }

View File

@ -88,9 +88,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
bool _udp_only {false}; bool _udp_only {false};
size_t _max_concurrent_in {4}; size_t _max_concurrent_in {4};
size_t _max_concurrent_out {8}; size_t _max_concurrent_out {4};
// TODO: probably also includes running transfers rn (meh)
size_t _max_pending_requests {32}; // per content
public: public:
SHA1_NGCFT1( SHA1_NGCFT1(