From d19fc6ba30f80b1f2e1e52981cb591b87d60cae4 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Wed, 3 Jul 2024 12:11:20 +0200 Subject: [PATCH] new chunk picker, basically working still needs work on the sending side and more bug fixes --- solanaceae/ngc_ft1_sha1/chunk_picker.cpp | 219 ++++++++++++++--------- solanaceae/ngc_ft1_sha1/chunk_picker.hpp | 7 +- solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 88 +++++++-- solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp | 4 +- 4 files changed, 210 insertions(+), 108 deletions(-) diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp index 227687f..44ae356 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp @@ -1,124 +1,169 @@ #include "./chunk_picker.hpp" -#include -#include +#include #include "./components.hpp" -#include -#include - -#include -#include #include #include + +void ChunkPicker::updateParticipation( + Contact3Handle c, + ObjectRegistry& objreg +) { + // replaces them in place + participating.clear(); + participating_unfinished.clear(); + + for (const Object ov : objreg.view()) { + const ObjectHandle o {objreg, ov}; + + participating.emplace(o); + + if (!o.all_of()) { + continue; + } + + if (!o.get().have_all) { + participating_unfinished.emplace(o, ParticipationEntry{}); + } + } +} + std::vector ChunkPicker::updateChunkRequests( Contact3Handle c, ObjectRegistry& objreg, ReceivingTransfers& rt //NGCFT1& nft ) { + if (!static_cast(c)) { + assert(false); return {}; + } + + if (!c.all_of()) { + assert(false); return {}; + } + const auto [group_number, peer_number] = c.get(); + std::vector req_ret; // count running tf and open requests - // TODO: implement - const size_t num_requests = max_tf_chunk_requests; + const size_t num_ongoing_transfers = rt.sizePeer(group_number, peer_number); + // TODO: account for open requests + // TODO: base max on rate(chunks per sec), gonna be ass with variable chunk size + const size_t num_requests = std::max(0, max_tf_chunk_requests-num_ongoing_transfers); // while n < X - while (false && !participating_unfinished.empty()) { - // round robin content (remember last obj) - if (!objreg.valid(participating_in_last)) { - participating_in_last = participating_unfinished.begin()->first; - //participating_in_last = *participating_unfinished.begin(); + + if (participating_unfinished.empty()) { + participating_in_last = entt::null; + return {}; + } + + // round robin content (remember last obj) + if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) { + participating_in_last = participating_unfinished.begin()->first; + //participating_in_last = *participating_unfinished.begin(); + } + assert(objreg.valid(participating_in_last)); + + auto it = participating_unfinished.find(participating_in_last); + // hard limit robin rounds to array size time 100 + for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*100; i++) { + if (it == participating_unfinished.end()) { + it = participating_unfinished.begin(); } - assert(objreg.valid(participating_in_last)); - auto it = participating_unfinished.find(participating_in_last); - // hard limit robin rounds to array size time 100 - for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*100; i++) { - if (it == participating_unfinished.end()) { - it = participating_unfinished.begin(); - } + if (it->second.skips < it->second.should_skip) { + it->second.skips++; + continue; + } - if (it->second.skips < it->second.should_skip) { - it->second.skips++; + ObjectHandle o {objreg, it->first}; + + // intersect self have with other have + if (!o.all_of()) { + // rare case where no one other has anything + continue; + } + + const auto& cc = o.get(); + if (cc.have_all) { + std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n"; + continue; + } + + const auto& others_have = o.get().others; + auto other_it = others_have.find(c); + if (other_it == others_have.end()) { + // rare case where the other is participating but has nothing + continue; + } + + const auto& other_have = other_it->second; + + BitSet chunk_candidates = cc.have_chunk; + if (!other_have.have_all) { + // AND is the same as ~(~A | ~B) + // that means we leave chunk_candidates as (have is inverted want) + // merge is or + // invert at the end + chunk_candidates + .merge(other_have.have.invert()) + .invert(); + // TODO: add intersect for more perf + } else { + chunk_candidates.invert(); + } + const auto total_chunks = o.get().chunks.size(); + auto& requested_chunks = o.get_or_emplace().chunks; + + // TODO: trim off round up to 8, since they are now always set + + // now select (globaly) unrequested other have + // TODO: pick strategies + // TODO: how do we prioratize within a file? + // - first (walk from start (or readhead?)) + // - random (choose random start pos and walk) + // - rarest (keep track of rarity and sort by that) + // - steaming (use read head to determain time critical chunks, potentially over requesting, first (relative to stream head) otherwise + // maybe look into libtorrens deadline stuff + // - arbitrary priority maps/functions (and combine with above in rations) + + // simple, we use first + for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) { + if (!chunk_candidates[i]) { continue; } - ObjectHandle o {objreg, it->first}; + // i is a candidate we can request form peer - // intersect self have with other have - if (!o.all_of()) { - // rare case where no one other has anything + // first check against double requests + if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool { + return false; + }) != req_ret.cend()) { + // already in return array + // how did we get here? should we fast exit? if simple-first strat, we would want to + continue; // skip + } + + // second check against global requests (this might differ based on strat) + if (requested_chunks.count(i) != 0) { continue; } - const auto& cc = o.get(); - if (cc.have_all) { - std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n"; + // third we check against globally running transfers (this might differ based on strat) + if (rt.containsChunk(o, i)) { continue; } - const auto& others_have = o.get().others; - auto other_it = others_have.find(c); - if (other_it == others_have.end()) { - // rare case where the other is participating but has nothing - continue; - } + // if nothing else blocks this, add to ret + req_ret.push_back(ContentChunkR{o, i}); - const auto& other_have = other_it->second; - - BitSet chunk_candidates = cc.have_chunk; - if (!other_have.have_all) { - // AND is the same as ~(~A | ~B) - // that means we leave chunk_candidates as (have is inverted want) - // merge is or - // invert at the end - chunk_candidates - .merge(other_have.have.invert()) - .invert(); - // TODO: add intersect for more perf - } else { - chunk_candidates.invert(); - } - const auto total_chunks = o.get().chunks.size(); - // TODO: trim off round up to 8, since they are now always set - - // now select (globaly) unrequested other have - // TODO: pick strategies - // TODO: how do we prioratize within a file? - // - first (walk from start (or readhead?)) - // - random (choose random start pos and walk) - // - rarest (keep track of rarity and sort by that) - // - steaming (use read head to determain time critical chunks, potentially over requesting, first (relative to stream head) otherwise - // maybe look into libtorrens deadline stuff - // - arbitrary priority maps/functions (and combine with above in rations) - - // simple, we use first - for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) { - if (!chunk_candidates[i]) { - continue; - } - - // i is a candidate we can request form peer - - // first check against double requests - if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool { - return false; - }) != req_ret.cend()) { - // already in return array - // how did we get here? should we fast exit? if simple-first strat, we would want to - continue; // skip - } - - // TODO: also check against globally running transfers!!! - - - // if nothing else blocks this, add to ret - req_ret.push_back(ContentChunkR{o, i}); - } + assert(requested_chunks.count(i) == 0); + requested_chunks[i] = 0.f; } } diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp index 46b572d..2c6353c 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp @@ -41,6 +41,11 @@ struct ChunkPicker { entt::dense_set participating; Object participating_in_last {entt::null}; + void updateParticipation( + Contact3Handle c, + ObjectRegistry& objreg + ); + // tick //void sendInfoRequests(); // is this like a system? @@ -54,7 +59,7 @@ struct ChunkPicker { size_t chunk_index; }; // returns list of chunks to request - std::vector updateChunkRequests( + [[nodiscard]] std::vector updateChunkRequests( Contact3Handle c, ObjectRegistry& objreg, ReceivingTransfers& rt diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index c7fbe3a..6c41b87 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -45,7 +45,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { } void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) { - for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) { + for (auto& [i_g, i_p, i_o, i_h, i_t] : _queue_requested_chunk) { // if already in queue if (i_g == group_number && i_p == peer_number && i_h == hash) { // update timer @@ -261,6 +261,7 @@ void SHA1_NGCFT1::iterate(float delta) { float& timer = std::get(*it); timer += delta; + // forget after 10sec if (timer >= 10.f) { it = _queue_requested_chunk.erase(it); } else { @@ -290,8 +291,8 @@ void SHA1_NGCFT1::iterate(float delta) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { it->second += delta; - // 20sec, TODO: config - if (it->second >= 20.f) { + // 15sec, TODO: config + if (it->second >= 15.f) { it = ftchunk_requested.chunks.erase(it); } else { it++; @@ -386,6 +387,7 @@ void SHA1_NGCFT1::iterate(float delta) { std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; } +#if 0 } else if (!_queue_content_want_chunk.empty()) { const auto ce = _queue_content_want_chunk.front(); @@ -450,7 +452,47 @@ void SHA1_NGCFT1::iterate(float delta) { } } } +#endif } + + // new chunk picker code + _cr.view().each([this](const Contact3 cv, ChunkPicker& cp) { + Contact3Handle c{_cr, cv}; + // HACK: expensive, dont do every tick, only on events + // do verification in debug instead? + cp.updateParticipation( + c, + _os.registry() + ); + + assert(!cp.participating.empty()); + + auto new_requests = cp.updateChunkRequests( + c, + _os.registry(), + _receiving_transfers + ); + + if (new_requests.empty()) { + return; + } + + assert(c.all_of()); + const auto [group_number, peer_number] = c.get(); + + for (const auto [r_o, r_idx] : new_requests) { + auto& cc = r_o.get(); + const auto& info = r_o.get(); + + // request chunk_idx + _nft.NGC_FT1_send_request_private( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), + info.chunks.at(r_idx).data.data(), info.chunks.at(r_idx).size() + ); + std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n"; + } + }); } } @@ -645,22 +687,22 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { return false; } - auto ce = _chunks.at(chunk_hash); + auto o = _chunks.at(chunk_hash); { // they advertise interest in the content const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); - addParticipation(c, ce); + addParticipation(c, o); } - assert(ce.all_of()); + assert(o.all_of()); - if (!ce.get().haveChunk(chunk_hash)) { + if (!o.get().haveChunk(chunk_hash)) { // we dont have the chunk return false; } // queue good request - queueUpRequestChunk(e.group_number, e.peer_number, ce, chunk_hash); + queueUpRequestChunk(e.group_number, e.peer_number, o, chunk_hash); } else { assert(false && "unhandled case"); } @@ -711,19 +753,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { return false; } - auto ce = _chunks.at(sha1_chunk_hash); - - // CHECK IF TRANSFER IN PROGESS!! + auto o = _chunks.at(sha1_chunk_hash); { // they have the content (probably, might be fake, should move this to done) const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); - addParticipation(c, ce); + addParticipation(c, o); } - assert(ce.all_of()); - assert(ce.all_of()); + assert(o.all_of()); + assert(o.all_of()); - const auto& cc = ce.get(); + const auto& cc = o.get(); if (cc.haveChunk(sha1_chunk_hash)) { std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n"; // we have the chunk @@ -735,7 +775,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { auto idx_vec = cc.chunkIndices(sha1_chunk_hash); assert(!idx_vec.empty()); - const auto& info = ce.get(); + // CHECK IF TRANSFER IN PROGESS!! + for (const auto idx : idx_vec) { + if (_receiving_transfers.containsPeerChunk(e.group_number, e.peer_number, o, idx)) { + std::cerr << "SHA1_NGCFT1 error: " << e.group_number << ":" << e.peer_number << " offered chunk(" << idx << ") it is already receiving!!\n"; + return false; + } + } + + const auto& info = o.get(); // TODO: check e.file_size assert(e.file_size == info.chunkSize(idx_vec.front())); @@ -743,7 +791,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { _receiving_transfers.emplaceChunk( e.group_number, e.peer_number, e.transfer_id, - ReceivingTransfers::Entry::Chunk{ce, idx_vec} + ReceivingTransfers::Entry::Chunk{o, idx_vec} ); e.accept = true; @@ -1108,6 +1156,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { // HACK: assume the message sender is participating. usually a safe bet. addParticipation(c, ce); + // HACK: assume the message sender has all + ce.get_or_emplace().others[c] = {true, {}}; + if (!ce.all_of() && !ce.all_of()) { // TODO: check if already receiving _queue_content_want_info.push_back(ce); @@ -1419,6 +1470,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { return false; } + c.remove(); + for (const auto& [_, o] : _info_to_content) { removeParticipation(c, o); @@ -1484,6 +1537,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) { for (const auto c_i : e.chunks) { if (c_i >= num_total_chunks) { std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n"; + std::cerr << info_hash << ": " << c_i << " >= " << num_total_chunks << "\n"; continue; } diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index 22bb203..2c5aad2 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -88,9 +88,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public bool _udp_only {false}; size_t _max_concurrent_in {4}; - size_t _max_concurrent_out {8}; - // TODO: probably also includes running transfers rn (meh) - size_t _max_pending_requests {32}; // per content + size_t _max_concurrent_out {4}; public: SHA1_NGCFT1(