diff --git a/src/ngcft1.cpp b/src/ngcft1.cpp index 9c3feae..8e36454 100644 --- a/src/ngcft1.cpp +++ b/src/ngcft1.cpp @@ -438,7 +438,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) { } bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) { - std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << e.transfer_id << " [" << bin2hex(e.file_id) << "]\n"; + std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n"; bool accept = false; dispatch( diff --git a/src/sha1_ngcft1.cpp b/src/sha1_ngcft1.cpp index 3288b63..5b43823 100644 --- a/src/sha1_ngcft1.cpp +++ b/src/sha1_ngcft1.cpp @@ -14,6 +14,8 @@ #include +#include + #include #include @@ -51,6 +53,14 @@ namespace Components { bool haveChunk(const SHA1Digest& hash) const; }; + struct SuspectedParticipants { + entt::dense_set participants; + }; + + struct ReRequestInfoTimer { + float timer {0.f}; + }; + } // Components std::optional Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const { @@ -198,49 +208,45 @@ void SHA1_NGCFT1::iterate(float delta) { it++; } } + + { // requested info timers + std::vector timed_out; + _contentr.view().each([delta, &timed_out](Content e, Components::ReRequestInfoTimer& rrit) { + rrit.timer += delta; + + // 15sec, TODO: config + if (rrit.timer >= 15.f) { + timed_out.push_back(e); + } + }); + for (const auto e : timed_out) { + // TODO: avoid dups + _queue_content_want_info.push_back({_contentr, e}); + _contentr.remove(e); + } + } } // if we have not reached the total cap for transfers // count running transfers - size_t running_transfer_count {0}; + size_t running_sending_transfer_count {0}; for (const auto& [_, transfers] : _sending_transfers) { - running_transfer_count += transfers.size(); + running_sending_transfer_count += transfers.size(); + } + size_t running_receiving_transfer_count {0}; + for (const auto& [_, transfers] : _receiving_transfers) { + running_receiving_transfer_count += transfers.size(); } - if (running_transfer_count < _max_concurrent_out) { - // for each peer? transfer cap per peer? -#if 0 - // first check requests for info - if (!_queue_requested_info.empty()) { - // send init to _queue_requested_info - const auto [group_number, peer_number] = _queue_requested_info.front(); - if (_tcl.getGroupPeerConnectionStatus(group_number, peer_number) != TOX_CONNECTION_NONE) { - uint8_t transfer_id {0}; - - if (_tcl.sendFT1InitPrivate( - group_number, peer_number, - NGC_FT1_file_kind::HASH_SHA1_INFO, - _sha1_info_hash.data.data(), _sha1_info_hash.size(), // id (info hash) - _sha1_info_data.size(), // "file_size" - transfer_id - )) { - _transfers_requested_info.push_back({ - group_number, peer_number, - transfer_id, - 0.f - }); - - _queue_requested_info.pop_front(); - } - } - } else -#endif + if (running_sending_transfer_count < _max_concurrent_out) { + // TODO: for each peer? transfer cap per peer? + // TODO: info queue if (!_queue_requested_chunk.empty()) { // then check for chunk requests - const auto [group_number, peer_number, msg, chunk_hash, _] = _queue_requested_chunk.front(); + const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front(); - auto chunk_idx_opt = msg.get().chunkIndex(chunk_hash); + auto chunk_idx_opt = ce.get().chunkIndex(chunk_hash); if (chunk_idx_opt.has_value()) { - const auto& info = msg.get(); + const auto& info = ce.get(); uint8_t transfer_id {0}; if (_nft.NGC_FT1_send_init_private( @@ -253,13 +259,106 @@ void SHA1_NGCFT1::iterate(float delta) { _sending_transfers [combineIds(group_number, peer_number)] [transfer_id] // TODO: also save index? - .v = SendingTransfer::Chunk{msg, chunk_idx_opt.value() * info.chunk_size}; + .v = SendingTransfer::Chunk{ce, chunk_idx_opt.value() * info.chunk_size}; } } // remove from queue regardless _queue_requested_chunk.pop_front(); } } + + if (running_receiving_transfer_count < _max_concurrent_in) { + // strictly priorize info + if (!_queue_content_want_info.empty()) { + const auto ce = _queue_content_want_info.front(); + + // make sure we are missing the info + assert(!ce.all_of()); + assert(!ce.all_of()); + assert(!ce.all_of()); + assert(!ce.all_of()); + assert(ce.all_of()); + + // get a list of peers we can request this file from + // TODO: randomly request from non SuspectedParticipants + std::vector> tox_peers; + for (const auto c : ce.get().participants) { + // TODO: sort by con state? + // prio to direct? + if (const auto* cs = _cr.try_get(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { + continue; + } + + if (_cr.all_of(c)) { + const auto& tgpe = _cr.get(c); + tox_peers.push_back({tgpe.group_number, tgpe.peer_number}); + } + } + + // 1 in 20 chance to ask random peer instead + // TODO: config + tweak + // TODO: save group in content to avoid the tox_peers list build + if (tox_peers.empty() || (_rng()%20) == 0) { + // meh + // HACK: determain group based on last tox_peers + if (!tox_peers.empty()) { + const uint32_t group_number = tox_peers.back().first; + auto gch = _tcm.getContactGroup(group_number); + assert(static_cast(gch)); + + std::vector un_tox_peers; + for (const auto child : gch.get().subs) { + if (const auto* cs = _cr.try_get(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { + continue; + } + + if (_cr.all_of(child)) { + const auto& tgpe = _cr.get(child); + un_tox_peers.push_back(tgpe.peer_number); + } + } + if (un_tox_peers.empty()) { + // no one online, we are out of luck + } else { + const size_t sample_i = _rng()%un_tox_peers.size(); + const auto peer_number = un_tox_peers.at(sample_i); + + //const auto& info = msg.get(); + const auto& info_hash = ce.get().hash; + + _nft.NGC_FT1_send_request_private( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), + info_hash.data(), info_hash.size() + ); + ce.emplace(0.f); + + _queue_content_want_info.pop_front(); + + std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << " (rng)\n"; + } + } + } else { + const size_t sample_i = _rng()%tox_peers.size(); + const auto [group_number, peer_number] = tox_peers.at(sample_i); + + //const auto& info = msg.get(); + const auto& info_hash = ce.get().hash; + + _nft.NGC_FT1_send_request_private( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), + info_hash.data(), info_hash.size() + ); + ce.emplace(0.f); + + _queue_content_want_info.pop_front(); + + std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; + } + } else if (!_queue_content_want_chunk.empty()) { + } + } } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { @@ -317,17 +416,22 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { return false; } - auto msg = _chunks.at(chunk_hash); + auto ce = _chunks.at(chunk_hash); + + { // they advertise interest in the content + const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + ce.get_or_emplace().participants.emplace(c); + } assert(msg.all_of()); - if (!msg.get().haveChunk(chunk_hash)) { + if (!ce.get().haveChunk(chunk_hash)) { // we dont have the chunk return false; } // queue good request - queueUpRequestChunk(e.group_number, e.peer_number, msg, chunk_hash); + queueUpRequestChunk(e.group_number, e.peer_number, ce, chunk_hash); } else { assert(false && "unhandled case"); } @@ -411,6 +515,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) { return false; } + const auto& tv = peer_transfers[e.transfer_id].v; + if (std::holds_alternative(tv)) { + updateMessages(std::get(tv).content); // mostly for sent bytes + } peer_transfers.erase(e.transfer_id); return true; @@ -490,6 +598,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { } ce.get_or_emplace().messages.push_back({reg, new_msg_e}); + ce.get_or_emplace().participants.emplace(c); + + if (!ce.all_of() && !ce.all_of()) { + // TODO: check if already receiving + _queue_content_want_info.push_back(ce); + } + // TODO: queue info dl //reg_ptr->emplace(e, sha1_info); @@ -598,9 +713,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std cc.have_count = sha1_info.chunks.size(); // need? _info_to_content[sha1_info_hash] = ce; - for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - _chunks[sha1_info.chunks[i]] = ce; - cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; + for (size_t i = sha1_info.chunks.size(); i > 0; i--) { + _chunks[sha1_info.chunks[i-1]] = ce; + // chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index + cc.chunk_hash_to_index[sha1_info.chunks[i-1]] = i-1; } } diff --git a/src/sha1_ngcft1.hpp b/src/sha1_ngcft1.hpp index e910914..21ff09c 100644 --- a/src/sha1_ngcft1.hpp +++ b/src/sha1_ngcft1.hpp @@ -15,6 +15,7 @@ #include #include +#include enum class Content : uint32_t {}; using ContentRegistry = entt::basic_registry; @@ -26,6 +27,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { NGCFT1& _nft; ToxContactModel2& _tcm; + std::minstd_rand _rng {1337*11}; + // registry per group? ContentRegistry _contentr; @@ -64,10 +67,10 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { struct ReceivingTransfer { struct Info { + ContentHandle content; // copy of info data // too large? std::vector info_data; - // content? }; struct Chunk { @@ -84,6 +87,10 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { // key is groupid + peerid entt::dense_map> _receiving_transfers; + // makes request rotate around open content + std::deque _queue_content_want_info; + std::deque _queue_content_want_chunk; + static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number); void updateMessages(ContentHandle ce);