diff --git a/CMakeLists.txt b/CMakeLists.txt index 5b2c67d..ffbe44b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -58,6 +58,7 @@ add_library(solanaceae_sha1_ngcft1 ./solanaceae/ngc_ft1_sha1/components.cpp ./solanaceae/ngc_ft1_sha1/chunk_picker.hpp + ./solanaceae/ngc_ft1_sha1/chunk_picker.cpp ./solanaceae/ngc_ft1_sha1/participation.hpp ./solanaceae/ngc_ft1_sha1/participation.cpp diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp new file mode 100644 index 0000000..227687f --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp @@ -0,0 +1,132 @@ +#include "./chunk_picker.hpp" + +#include +#include + +#include "./components.hpp" + +#include +#include + +#include +#include +#include + +#include + +std::vector ChunkPicker::updateChunkRequests( + Contact3Handle c, + ObjectRegistry& objreg, + ReceivingTransfers& rt + //NGCFT1& nft +) { + std::vector req_ret; + + // count running tf and open requests + // TODO: implement + const size_t num_requests = max_tf_chunk_requests; + + // while n < X + while (false && !participating_unfinished.empty()) { + // round robin content (remember last obj) + if (!objreg.valid(participating_in_last)) { + participating_in_last = participating_unfinished.begin()->first; + //participating_in_last = *participating_unfinished.begin(); + } + assert(objreg.valid(participating_in_last)); + + auto it = participating_unfinished.find(participating_in_last); + // hard limit robin rounds to array size time 100 + for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*100; i++) { + if (it == participating_unfinished.end()) { + it = participating_unfinished.begin(); + } + + if (it->second.skips < it->second.should_skip) { + it->second.skips++; + continue; + } + + ObjectHandle o {objreg, it->first}; + + // intersect self have with other have + if (!o.all_of()) { + // rare case where no one other has anything + continue; + } + + const auto& cc = o.get(); + if (cc.have_all) { + std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n"; + continue; + } + + const auto& others_have = o.get().others; + auto other_it = others_have.find(c); + if (other_it == others_have.end()) { + // rare case where the other is participating but has nothing + continue; + } + + const auto& other_have = other_it->second; + + BitSet chunk_candidates = cc.have_chunk; + if (!other_have.have_all) { + // AND is the same as ~(~A | ~B) + // that means we leave chunk_candidates as (have is inverted want) + // merge is or + // invert at the end + chunk_candidates + .merge(other_have.have.invert()) + .invert(); + // TODO: add intersect for more perf + } else { + chunk_candidates.invert(); + } + const auto total_chunks = o.get().chunks.size(); + // TODO: trim off round up to 8, since they are now always set + + // now select (globaly) unrequested other have + // TODO: pick strategies + // TODO: how do we prioratize within a file? + // - first (walk from start (or readhead?)) + // - random (choose random start pos and walk) + // - rarest (keep track of rarity and sort by that) + // - steaming (use read head to determain time critical chunks, potentially over requesting, first (relative to stream head) otherwise + // maybe look into libtorrens deadline stuff + // - arbitrary priority maps/functions (and combine with above in rations) + + // simple, we use first + for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) { + if (!chunk_candidates[i]) { + continue; + } + + // i is a candidate we can request form peer + + // first check against double requests + if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool { + return false; + }) != req_ret.cend()) { + // already in return array + // how did we get here? should we fast exit? if simple-first strat, we would want to + continue; // skip + } + + // TODO: also check against globally running transfers!!! + + + // if nothing else blocks this, add to ret + req_ret.push_back(ContentChunkR{o, i}); + } + } + } + + // -- no -- (just compat with old code, ignore) + // if n < X + // optimistically request 1 chunk other does not have + // (don't mark es requested? or lower cooldown to re-request?) + + return req_ret; +} + diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp index df99887..46b572d 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp @@ -1,19 +1,18 @@ #pragma once -#include #include #include #include "./components.hpp" +#include "./receiving_transfers.hpp" + #include #include #include #include -#include - //#include // goal is to always keep 2 transfers running and X(6) requests queued up @@ -58,117 +57,9 @@ struct ChunkPicker { std::vector updateChunkRequests( Contact3Handle c, ObjectRegistry& objreg, - size_t num_requests + ReceivingTransfers& rt //NGCFT1& nft - ) { - std::vector req_ret; - - // count running tf and open requests - // TODO: replace num_requests with this logic - - // while n < X - while (false && !participating_unfinished.empty()) { - // round robin content (remember last obj) - if (!objreg.valid(participating_in_last)) { - participating_in_last = participating_unfinished.begin()->first; - //participating_in_last = *participating_unfinished.begin(); - } - assert(objreg.valid(participating_in_last)); - - auto it = participating_unfinished.find(participating_in_last); - // hard limit robin rounds to array size time 100 - for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*100; i++) { - if (it == participating_unfinished.end()) { - it = participating_unfinished.begin(); - } - - if (it->second.skips < it->second.should_skip) { - it->second.skips++; - continue; - } - - ObjectHandle o {objreg, it->first}; - - // intersect self have with other have - if (!o.all_of()) { - // rare case where no one other has anything - continue; - } - - const auto& cc = o.get(); - if (cc.have_all) { - std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n"; - continue; - } - - const auto& others_have = o.get().others; - auto other_it = others_have.find(c); - if (other_it == others_have.end()) { - // rare case where the other is participating but has nothing - continue; - } - - const auto& other_have = other_it->second; - - BitSet chunk_candidates = cc.have_chunk; - if (!other_have.have_all) { - // AND is the same as ~(~A | ~B) - // that means we leave chunk_candidates as (have is inverted want) - // merge is or - // invert at the end - chunk_candidates - .merge(other_have.have.invert()) - .invert(); - // TODO: add intersect for more perf - } else { - chunk_candidates.invert(); - } - const auto total_chunks = o.get().chunks.size(); - // TODO: trim off round up to 8, since they are now always set - - // now select (globaly) unrequested other have - // TODO: pick strategies - // TODO: how do we prioratize within a file? - // - first (walk from start (or readhead?)) - // - random (choose random start pos and walk) - // - rarest (keep track of rarity and sort by that) - // - steaming (use read head to determain time critical chunks, potentially over requesting, first (relative to stream head) otherwise - // maybe look into libtorrens deadline stuff - // - arbitrary priority maps/functions (and combine with above in rations) - - // simple, we use first - for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) { - if (!chunk_candidates[i]) { - continue; - } - - // i is a candidate we can request form peer - - // first check against double requests - if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool { - return false; - }) != req_ret.cend()) { - // already in return array - // how did we get here? should we fast exit? if simple-first strat, we would want to - continue; // skip - } - - // TODO: also check against globally running transfers!!! - - - // if nothing else blocks this, add to ret - req_ret.push_back(ContentChunkR{o, i}); - } - } - } - - // -- no -- (just compat with old code, ignore) - // if n < X - // optimistically request 1 chunk other does not have - // (don't mark es requested? or lower cooldown to re-request?) - - return req_ret; - } + ); // - reset on disconnect? void resetPeer( diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index 993df4b..217cee6 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -77,11 +77,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public std::atomic_bool _info_builder_dirty {false}; std::mutex _info_builder_queue_mutex; - //struct InfoBuilderEntry { - //// called on completion on the iterate thread - //// (owning) - //std::function fn; - //}; using InfoBuilderEntry = std::function; std::list _info_builder_queue; @@ -94,7 +89,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public public: // TODO: config bool _udp_only {false}; - size_t _max_concurrent_in {6}; + size_t _max_concurrent_in {4}; size_t _max_concurrent_out {8}; // TODO: probably also includes running transfers rn (meh) size_t _max_pending_requests {32}; // per content