From b53e291c6897c9aef44e89cd9d9cfa5008db54a4 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Fri, 28 Jun 2024 15:13:17 +0200 Subject: [PATCH] wip chunk picker (still unused) and a small refactor --- solanaceae/ngc_ft1_sha1/chunk_picker.hpp | 178 +++++++++++++++++++++++ solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 50 ++++++- solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp | 3 + 3 files changed, 224 insertions(+), 7 deletions(-) create mode 100644 solanaceae/ngc_ft1_sha1/chunk_picker.hpp diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp new file mode 100644 index 0000000..df99887 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp @@ -0,0 +1,178 @@ +#pragma once + +#include +#include +#include + +#include "./components.hpp" + +#include +#include + +#include +#include + +#include + +//#include + +// goal is to always keep 2 transfers running and X(6) requests queued up +// per peer + +// contact component? +struct ChunkPicker { + // max transfers + static constexpr size_t max_tf_info_requests {1}; + static constexpr size_t max_tf_chunk_requests {2}; + + // max outstanding requests + // TODO: should this include transfers? + static constexpr size_t max_open_info_requests {1}; + const size_t max_open_chunk_requests {6}; + + // TODO: handle with hash utils? + struct ParticipationEntry { + ParticipationEntry(void) {} + // skips in round robin -> lower should_skip => higher priority + uint16_t should_skip {2}; // 0 high, 8 low (double each time? 0,1,2,4,8) + uint16_t skips {2}; + }; + // TODO: only unfinished? + entt::dense_map participating_unfinished; + entt::dense_set participating; + Object participating_in_last {entt::null}; + + // tick + //void sendInfoRequests(); + // is this like a system? + // TODO: only update on: + // - transfer start? + // - transfer done + // - request timed out + // - reset on disconnect? + struct ContentChunkR { + ObjectHandle object; + size_t chunk_index; + }; + // returns list of chunks to request + std::vector updateChunkRequests( + Contact3Handle c, + ObjectRegistry& objreg, + size_t num_requests + //NGCFT1& nft + ) { + std::vector req_ret; + + // count running tf and open requests + // TODO: replace num_requests with this logic + + // while n < X + while (false && !participating_unfinished.empty()) { + // round robin content (remember last obj) + if (!objreg.valid(participating_in_last)) { + participating_in_last = participating_unfinished.begin()->first; + //participating_in_last = *participating_unfinished.begin(); + } + assert(objreg.valid(participating_in_last)); + + auto it = participating_unfinished.find(participating_in_last); + // hard limit robin rounds to array size time 100 + for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*100; i++) { + if (it == participating_unfinished.end()) { + it = participating_unfinished.begin(); + } + + if (it->second.skips < it->second.should_skip) { + it->second.skips++; + continue; + } + + ObjectHandle o {objreg, it->first}; + + // intersect self have with other have + if (!o.all_of()) { + // rare case where no one other has anything + continue; + } + + const auto& cc = o.get(); + if (cc.have_all) { + std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n"; + continue; + } + + const auto& others_have = o.get().others; + auto other_it = others_have.find(c); + if (other_it == others_have.end()) { + // rare case where the other is participating but has nothing + continue; + } + + const auto& other_have = other_it->second; + + BitSet chunk_candidates = cc.have_chunk; + if (!other_have.have_all) { + // AND is the same as ~(~A | ~B) + // that means we leave chunk_candidates as (have is inverted want) + // merge is or + // invert at the end + chunk_candidates + .merge(other_have.have.invert()) + .invert(); + // TODO: add intersect for more perf + } else { + chunk_candidates.invert(); + } + const auto total_chunks = o.get().chunks.size(); + // TODO: trim off round up to 8, since they are now always set + + // now select (globaly) unrequested other have + // TODO: pick strategies + // TODO: how do we prioratize within a file? + // - first (walk from start (or readhead?)) + // - random (choose random start pos and walk) + // - rarest (keep track of rarity and sort by that) + // - steaming (use read head to determain time critical chunks, potentially over requesting, first (relative to stream head) otherwise + // maybe look into libtorrens deadline stuff + // - arbitrary priority maps/functions (and combine with above in rations) + + // simple, we use first + for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) { + if (!chunk_candidates[i]) { + continue; + } + + // i is a candidate we can request form peer + + // first check against double requests + if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool { + return false; + }) != req_ret.cend()) { + // already in return array + // how did we get here? should we fast exit? if simple-first strat, we would want to + continue; // skip + } + + // TODO: also check against globally running transfers!!! + + + // if nothing else blocks this, add to ret + req_ret.push_back(ContentChunkR{o, i}); + } + } + } + + // -- no -- (just compat with old code, ignore) + // if n < X + // optimistically request 1 chunk other does not have + // (don't mark es requested? or lower cooldown to re-request?) + + return req_ret; + } + + // - reset on disconnect? + void resetPeer( + Contact3Handle c + ); +}; + diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index 0ee1e16..da214ba 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -17,6 +17,7 @@ #include "./file_rw_mapped.hpp" #include "./components.hpp" +#include "./chunk_picker.hpp" #include #include @@ -113,6 +114,36 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle ce) { } } +bool SHA1_NGCFT1::addParticipation(Contact3 c, ObjectHandle o) { + bool was_new {false}; + + if (static_cast(o)) { + const auto [_, inserted] = o.get_or_emplace().participants.emplace(c); + was_new = inserted; + } + + if (_cr.valid(c)) { + const auto [_, inserted] = _cr.get_or_emplace(c).participating.emplace(o); + was_new = was_new || inserted; + + // TODO: if not have_all + _cr.get_or_emplace(c).participating_unfinished.emplace(o, ChunkPicker::ParticipationEntry{}); + } + + return was_new; +} + +void SHA1_NGCFT1::removeParticipation(Contact3 c, ObjectHandle o) { + if (static_cast(o) && o.all_of()) { + o.get().participants.erase(c); + } + + if (_cr.valid(c) && _cr.all_of(c)) { + _cr.get(c).participating.erase(o); + _cr.get(c).participating_unfinished.erase(o); + } +} + std::optional> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) { // get a list of peers we can request this file from std::vector> tox_peers; @@ -673,7 +704,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { { // they advertise interest in the content const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); - ce.get_or_emplace().participants.emplace(c); + addParticipation(c, ce); } assert(ce.all_of()); @@ -740,7 +771,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { { // they have the content (probably, might be fake, should move this to done) const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); - ce.get_or_emplace().participants.emplace(c); + addParticipation(c, ce); } assert(ce.all_of()); @@ -1134,7 +1165,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { ce.get_or_emplace().messages.push_back({reg, new_msg_e}); reg_ptr->emplace(new_msg_e, ce); - ce.get_or_emplace().participants.emplace(c); + // HACK: assume the message sender is participating. usually a safe bet. + addParticipation(c, ce); if (!ce.all_of() && !ce.all_of()) { // TODO: check if already receiving @@ -1448,9 +1480,7 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { } for (const auto& [_, h] : _info_to_content) { - if (h.all_of()) { - h.get().participants.erase(ch); - } + removeParticipation(ch, h); if (h.all_of()) { h.get().others.erase(ch); @@ -1496,6 +1526,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) { const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + // we might not know yet + addParticipation(c, ce); + auto& remote_have = ce.get_or_emplace().others; if (!remote_have.contains(c)) { // init @@ -1572,6 +1605,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) { const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + // we might not know yet + addParticipation(c, ce); + auto& remote_have = ce.get_or_emplace().others; if (!remote_have.contains(c)) { // init @@ -1636,7 +1672,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) { // add them to participants auto ce = itc_it->second; const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); - const auto [_, was_new] = ce.get_or_emplace().participants.emplace(c); + const bool was_new = addParticipation(c, ce); if (was_new) { std::cout << "SHA1_NGCFT1: and we where interested!\n"; // we should probably send the bitset back here / add to queue (can be multiple packets) diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index 8fa1038..0fcf93e 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -108,6 +108,9 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public void updateMessages(ObjectHandle ce); + bool addParticipation(Contact3 c, ObjectHandle o); + void removeParticipation(Contact3 c, ObjectHandle o); + std::optional> selectPeerForRequest(ObjectHandle ce); public: // TODO: config