diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp index 44ae356..644eaf8 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp @@ -35,7 +35,8 @@ void ChunkPicker::updateParticipation( std::vector ChunkPicker::updateChunkRequests( Contact3Handle c, ObjectRegistry& objreg, - ReceivingTransfers& rt + ReceivingTransfers& rt, + const size_t open_requests //NGCFT1& nft ) { if (!static_cast(c)) { @@ -47,21 +48,23 @@ std::vector ChunkPicker::updateChunkRequests( } const auto [group_number, peer_number] = c.get(); + if (participating_unfinished.empty()) { + participating_in_last = entt::null; + return {}; + } + std::vector req_ret; // count running tf and open requests const size_t num_ongoing_transfers = rt.sizePeer(group_number, peer_number); // TODO: account for open requests + const int64_t num_total = num_ongoing_transfers + open_requests; // TODO: base max on rate(chunks per sec), gonna be ass with variable chunk size - const size_t num_requests = std::max(0, max_tf_chunk_requests-num_ongoing_transfers); + const size_t num_requests = std::max(0, int64_t(max_tf_chunk_requests)-num_total); + std::cerr << "CP: want " << num_requests << "(rt:" << num_ongoing_transfers << " or:" << open_requests << ") from " << group_number << ":" << peer_number << "\n"; // while n < X - if (participating_unfinished.empty()) { - participating_in_last = entt::null; - return {}; - } - // round robin content (remember last obj) if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) { participating_in_last = participating_unfinished.begin()->first; @@ -133,28 +136,29 @@ std::vector ChunkPicker::updateChunkRequests( // - arbitrary priority maps/functions (and combine with above in rations) // simple, we use first + // TODO: optimize simple and start at first chunk we dont have for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) { if (!chunk_candidates[i]) { continue; } - // i is a candidate we can request form peer + // i is a potential candidate we can request form peer - // first check against double requests - if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool { - return false; + // - check against double requests + if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const ContentChunkR& x) -> bool { + return x.object == o && x.chunk_index == i; }) != req_ret.cend()) { // already in return array // how did we get here? should we fast exit? if simple-first strat, we would want to continue; // skip } - // second check against global requests (this might differ based on strat) + // - check against global requests (this might differ based on strat) if (requested_chunks.count(i) != 0) { continue; } - // third we check against globally running transfers (this might differ based on strat) + // - we check against globally running transfers (this might differ based on strat) if (rt.containsChunk(o, i)) { continue; } @@ -162,11 +166,16 @@ std::vector ChunkPicker::updateChunkRequests( // if nothing else blocks this, add to ret req_ret.push_back(ContentChunkR{o, i}); - assert(requested_chunks.count(i) == 0); - requested_chunks[i] = 0.f; + // TODO: move this after packet was sent successfully + // (move net in? hmm) + requested_chunks[i] = Components::FT1ChunkSHA1Requested::Entry{0.f, c}; } } + if (req_ret.size() < num_requests) { + std::cerr << "CP: could not fulfil, " << group_number << ":" << peer_number << " only has " << req_ret.size() << " candidates\n"; + } + // -- no -- (just compat with old code, ignore) // if n < X // optimistically request 1 chunk other does not have diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp index 2c6353c..60584dc 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp @@ -22,12 +22,12 @@ struct ChunkPicker { // max transfers static constexpr size_t max_tf_info_requests {1}; - static constexpr size_t max_tf_chunk_requests {2}; + static constexpr size_t max_tf_chunk_requests {3}; - // max outstanding requests - // TODO: should this include transfers? - static constexpr size_t max_open_info_requests {1}; - const size_t max_open_chunk_requests {6}; + //// max outstanding requests + //// TODO: should this include transfers? + //static constexpr size_t max_open_info_requests {1}; + //const size_t max_open_chunk_requests {6}; // TODO: handle with hash utils? struct ParticipationEntry { @@ -62,7 +62,8 @@ struct ChunkPicker { [[nodiscard]] std::vector updateChunkRequests( Contact3Handle c, ObjectRegistry& objreg, - ReceivingTransfers& rt + ReceivingTransfers& rt, + const size_t open_requests //NGCFT1& nft ); diff --git a/solanaceae/ngc_ft1_sha1/components.hpp b/solanaceae/ngc_ft1_sha1/components.hpp index c518bd9..7f95622 100644 --- a/solanaceae/ngc_ft1_sha1/components.hpp +++ b/solanaceae/ngc_ft1_sha1/components.hpp @@ -49,7 +49,11 @@ namespace Components { struct FT1ChunkSHA1Requested { // requested chunks with a timer since last request - entt::dense_map chunks; + struct Entry { + float timer {0.f}; + Contact3 c {entt::null}; + }; + entt::dense_map chunks; }; // TODO: once announce is shipped, remove the "Suspected" diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index 6c41b87..e9c6fcf 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -217,6 +217,7 @@ SHA1_NGCFT1::SHA1_NGCFT1( } void SHA1_NGCFT1::iterate(float delta) { + std::cerr << "---------- new tick ----------\n"; // info builder queue if (_info_builder_dirty) { std::lock_guard l{_info_builder_queue_mutex}; @@ -229,6 +230,8 @@ void SHA1_NGCFT1::iterate(float delta) { _info_builder_queue.clear(); } + entt::dense_map _peer_open_requests; + { // timers // sending transfers for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) { @@ -287,14 +290,15 @@ void SHA1_NGCFT1::iterate(float delta) { } } { // requested chunk timers - _os.registry().view().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) { + _os.registry().view().each([delta, &_peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { - it->second += delta; + it->second.timer += delta; // 15sec, TODO: config - if (it->second >= 15.f) { + if (it->second.timer >= 15.f) { it = ftchunk_requested.chunks.erase(it); } else { + _peer_open_requests[it->second.c] += 1; it++; } } @@ -387,76 +391,10 @@ void SHA1_NGCFT1::iterate(float delta) { std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; } -#if 0 - } else if (!_queue_content_want_chunk.empty()) { - const auto ce = _queue_content_want_chunk.front(); - - auto& requested_chunks = ce.get_or_emplace().chunks; - if (requested_chunks.size() < _max_pending_requests) { - - // select chunk/make sure we still need one - auto selected_peer_opt = selectPeerForRequest(ce); - if (selected_peer_opt.has_value()) { - const auto [group_number, peer_number] = selected_peer_opt.value(); - //std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n"; - auto& cc = ce.get(); - const auto& info = ce.get(); - - if (cc.have_all) { - _queue_content_want_chunk.pop_front(); - } else { - // naive, choose first chunk we dont have (double requests!!) - // TODO: piece picker, choose what other have (invert selectPeerForRequest) - for (size_t chunk_idx = 0; chunk_idx < info.chunks.size() /* cc.total_ */; chunk_idx++) { - if (cc.have_chunk[chunk_idx]) { - continue; - } - - // check by hash - if (cc.haveChunk(info.chunks.at(chunk_idx))) { - // TODO: fix this, a completed chunk should fill all the indecies it occupies - cc.have_chunk.set(chunk_idx); - cc.have_count += 1; - if (cc.have_count == info.chunks.size()) { - cc.have_all = true; - cc.have_chunk = BitSet(0); // conserve space - break; - } - continue; - } - - if (requested_chunks.count(chunk_idx)) { - // already requested - continue; - } - - // request chunk_idx - _nft.NGC_FT1_send_request_private( - group_number, peer_number, - static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), - info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size() - ); - requested_chunks[chunk_idx] = 0.f; - std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n"; - - break; - } - - // ... - - // TODO: properly determine - if (!cc.have_all) { - _queue_content_want_chunk.push_back(ce); - } - _queue_content_want_chunk.pop_front(); - } - } - } -#endif } // new chunk picker code - _cr.view().each([this](const Contact3 cv, ChunkPicker& cp) { + _cr.view().each([this, &_peer_open_requests](const Contact3 cv, ChunkPicker& cp) { Contact3Handle c{_cr, cv}; // HACK: expensive, dont do every tick, only on events // do verification in debug instead? @@ -467,10 +405,16 @@ void SHA1_NGCFT1::iterate(float delta) { assert(!cp.participating.empty()); + size_t peer_open_request = 0; + if (_peer_open_requests.contains(c)) { + peer_open_request += _peer_open_requests.at(c); + } + auto new_requests = cp.updateChunkRequests( c, _os.registry(), - _receiving_transfers + _receiving_transfers, + peer_open_request ); if (new_requests.empty()) { @@ -796,6 +740,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { e.accept = true; + // now running, remove from requested + for (const auto it : _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id).getChunk().chunk_indices) { + o.get_or_emplace().chunks.erase(it); + } + std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n"; } else { assert(false && "unhandled case");