From 8237f902c94e357c1b7ef95181c7a778c0e36d7d Mon Sep 17 00:00:00 2001 From: Green Sky Date: Wed, 16 Aug 2023 01:43:12 +0200 Subject: [PATCH] refactor + wip chunk receive handling --- external/solanaceae_message3 | 2 +- src/ft1_sha1_info.cpp | 9 ++ src/ft1_sha1_info.hpp | 2 + src/sha1_ngcft1.cpp | 195 +++++++++++++++++++++-------------- src/sha1_ngcft1.hpp | 2 + 5 files changed, 129 insertions(+), 81 deletions(-) diff --git a/external/solanaceae_message3 b/external/solanaceae_message3 index d2d6cfb..c73e429 160000 --- a/external/solanaceae_message3 +++ b/external/solanaceae_message3 @@ -1 +1 @@ -Subproject commit d2d6cfbf5326de74cbac2dcd11be74df2cf79d2a +Subproject commit c73e429df138ce32f5cdb5588a3af8257a946680 diff --git a/src/ft1_sha1_info.cpp b/src/ft1_sha1_info.cpp index f809b93..0daaadb 100644 --- a/src/ft1_sha1_info.cpp +++ b/src/ft1_sha1_info.cpp @@ -28,6 +28,15 @@ std::ostream& operator<<(std::ostream& out, const SHA1Digest& v) { return out; } +size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const { + if (chunk_index+1 == chunks.size()) { + // last chunk + return file_size - chunk_index * chunk_size; + } else { + return chunk_size; + } +} + std::vector FT1InfoSHA1::toBuffer(void) const { std::vector buffer; diff --git a/src/ft1_sha1_info.hpp b/src/ft1_sha1_info.hpp index c7c9bbe..17efca7 100644 --- a/src/ft1_sha1_info.hpp +++ b/src/ft1_sha1_info.hpp @@ -46,6 +46,8 @@ struct FT1InfoSHA1 { uint32_t chunk_size {128*1024}; // 128KiB for now std::vector chunks; + size_t chunkSize(size_t chunk_index) const; + std::vector toBuffer(void) const; void fromBuffer(const std::vector& buffer); }; diff --git a/src/sha1_ngcft1.cpp b/src/sha1_ngcft1.cpp index 48fa5a7..5ab317c 100644 --- a/src/sha1_ngcft1.cpp +++ b/src/sha1_ngcft1.cpp @@ -49,7 +49,6 @@ namespace Components { entt::dense_map chunk_hash_to_index; std::optional chunkIndex(const SHA1Digest& hash) const; - size_t chunkSize(size_t chunk_index) const; bool haveChunk(const SHA1Digest& hash) const; }; @@ -139,6 +138,64 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) { } } +std::optional> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) { + // get a list of peers we can request this file from + // TODO: randomly request from non SuspectedParticipants + std::vector> tox_peers; + for (const auto c : ce.get().participants) { + // TODO: sort by con state? + // prio to direct? + if (const auto* cs = _cr.try_get(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { + continue; + } + + if (_cr.all_of(c)) { + const auto& tgpe = _cr.get(c); + tox_peers.push_back({tgpe.group_number, tgpe.peer_number}); + } + } + + // 1 in 20 chance to ask random peer instead + // TODO: config + tweak + // TODO: save group in content to avoid the tox_peers list build + if (tox_peers.empty() || (_rng()%20) == 0) { + // meh + // HACK: determain group based on last tox_peers + if (!tox_peers.empty()) { + const uint32_t group_number = tox_peers.back().first; + auto gch = _tcm.getContactGroup(group_number); + assert(static_cast(gch)); + + std::vector un_tox_peers; + for (const auto child : gch.get().subs) { + if (const auto* cs = _cr.try_get(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { + continue; + } + + if (_cr.all_of(child)) { + const auto& tgpe = _cr.get(child); + un_tox_peers.push_back(tgpe.peer_number); + } + } + if (un_tox_peers.empty()) { + // no one online, we are out of luck + } else { + const size_t sample_i = _rng()%un_tox_peers.size(); + const auto peer_number = un_tox_peers.at(sample_i); + + return std::make_pair(group_number, peer_number); + } + } + } else { + const size_t sample_i = _rng()%tox_peers.size(); + const auto [group_number, peer_number] = tox_peers.at(sample_i); + + return std::make_pair(group_number, peer_number); + } + + return std::nullopt; +} + SHA1_NGCFT1::SHA1_NGCFT1( Contact3Registry& cr, RegistryMessageModel& rmm, @@ -295,68 +352,9 @@ void SHA1_NGCFT1::iterate(float delta) { assert(!ce.all_of()); assert(ce.all_of()); - // get a list of peers we can request this file from - // TODO: randomly request from non SuspectedParticipants - std::vector> tox_peers; - for (const auto c : ce.get().participants) { - // TODO: sort by con state? - // prio to direct? - if (const auto* cs = _cr.try_get(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { - continue; - } - - if (_cr.all_of(c)) { - const auto& tgpe = _cr.get(c); - tox_peers.push_back({tgpe.group_number, tgpe.peer_number}); - } - } - - // 1 in 20 chance to ask random peer instead - // TODO: config + tweak - // TODO: save group in content to avoid the tox_peers list build - if (tox_peers.empty() || (_rng()%20) == 0) { - // meh - // HACK: determain group based on last tox_peers - if (!tox_peers.empty()) { - const uint32_t group_number = tox_peers.back().first; - auto gch = _tcm.getContactGroup(group_number); - assert(static_cast(gch)); - - std::vector un_tox_peers; - for (const auto child : gch.get().subs) { - if (const auto* cs = _cr.try_get(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { - continue; - } - - if (_cr.all_of(child)) { - const auto& tgpe = _cr.get(child); - un_tox_peers.push_back(tgpe.peer_number); - } - } - if (un_tox_peers.empty()) { - // no one online, we are out of luck - } else { - const size_t sample_i = _rng()%un_tox_peers.size(); - const auto peer_number = un_tox_peers.at(sample_i); - - //const auto& info = msg.get(); - const auto& info_hash = ce.get().hash; - - _nft.NGC_FT1_send_request_private( - group_number, peer_number, - static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), - info_hash.data(), info_hash.size() - ); - ce.emplace(0.f); - - _queue_content_want_info.pop_front(); - - std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << " (rng)\n"; - } - } - } else { - const size_t sample_i = _rng()%tox_peers.size(); - const auto [group_number, peer_number] = tox_peers.at(sample_i); + auto selected_peer_opt = selectPeerForRequest(ce); + if (selected_peer_opt.has_value()) { + const auto [group_number, peer_number] = selected_peer_opt.value(); //const auto& info = msg.get(); const auto& info_hash = ce.get().hash; @@ -373,6 +371,18 @@ void SHA1_NGCFT1::iterate(float delta) { std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; } } else if (!_queue_content_want_chunk.empty()) { + const auto ce = _queue_content_want_chunk.front(); + + // select chunk/make sure we still need one + + auto selected_peer_opt = selectPeerForRequest(ce); + if (selected_peer_opt.has_value()) { + const auto [group_number, peer_number] = selected_peer_opt.value(); + + // ... + + _queue_content_want_chunk.pop_front(); + } } } } @@ -439,7 +449,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { ce.get_or_emplace().participants.emplace(c); } - assert(msg.all_of()); + assert(ce.all_of()); if (!ce.get().haveChunk(chunk_hash)) { // we dont have the chunk @@ -489,9 +499,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { .v = ReceivingTransfer::Info{ce, std::vector(e.file_size)}; e.accept = true; + } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { } else { - // TODO assert - return false; + assert(false && "unhandled case"); } return true; @@ -513,9 +523,19 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) { info_data[i+e.data_offset] = e.data[i]; } + } else if (std::holds_alternative(tv)) { + const auto offset_into_file = std::get(tv).offset_into_file; + auto ce = std::get(tv).content; + + assert(ce.all_of()); + auto* file = ce.get().get(); + assert(file != nullptr); + + // TODO: avoid temporary copy + // TODO: check return + file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size}); } else { - // TODO: handle chunks - // error/ assert + assert(false && "unhandled case"); } return true; @@ -624,7 +644,32 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { updateMessages(ce); } else if (std::holds_alternative(tv)) { - updateMessages(std::get(tv).content); // mostly for received bytes + const auto offset_into_file = std::get(tv).offset_into_file; + auto ce = std::get(tv).content; + const auto& info = ce.get(); + auto& cc = ce.get(); + + const auto chunk_index = offset_into_file/info.chunk_size; + assert(chunk_index < info.chunks.size()); + const auto chunk_size = info.chunkSize(chunk_index); + assert(offset_into_file+chunk_size <= info.file_size); + + const auto chunk_data = ce.get()->read(offset_into_file, chunk_size); + + // check hash of chunk + auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size()); + if (info.chunks.at(chunk_index) == got_hash) { + // TODO: check for have all + cc.have_chunk.at(chunk_index) = true; + + // good chunk + ce.get_or_emplace().total += chunk_data.size(); + } else { + // bad chunk + // TODO: requeue? + } + + updateMessages(ce); // mostly for received bytes } peer_transfers.erase(e.transfer_id); @@ -934,6 +979,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std // file id would be sha1_info hash or something //reg_ptr->emplace(e, file_id); + // remove? done in updateMessages() anyway if (ce.all_of()) { reg_ptr->emplace(msg_e, ce.get()); } @@ -947,17 +993,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std // TODO: determine if this is true //reg_ptr->emplace(e); -#if 0 - const auto friend_number = _cr.get(c).friend_number; - const auto&& [transfer_id, err] = _t.toxFileSend(friend_number, file_kind, file_impl->_file_size, file_id, file_name); - if (err == TOX_ERR_FILE_SEND_OK) { - reg_ptr->emplace(e, friend_number, transfer_id.value()); - // TODO: add tag signifying init sent status? - - toxFriendLookupAdd({*reg_ptr, e}); - } // else queue? -#endif - if (_cr.any_of(c)) { const uint32_t group_number = _cr.get(c).group_number; uint32_t message_id = 0; diff --git a/src/sha1_ngcft1.hpp b/src/sha1_ngcft1.hpp index 21ff09c..14f0634 100644 --- a/src/sha1_ngcft1.hpp +++ b/src/sha1_ngcft1.hpp @@ -95,6 +95,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { void updateMessages(ContentHandle ce); + std::optional> selectPeerForRequest(ContentHandle ce); + public: // TODO: config bool _udp_only {false};