From 853a54a9b509411aa2e8b2925ddccef5fc8d1c96 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Wed, 9 Aug 2023 23:02:29 +0200 Subject: [PATCH] sending chunks working --- src/plugin_ngcft1.cpp | 1 + src/sha1_ngcft1.cpp | 233 +++++++++++++++++++++++++++++++++++++++++- src/sha1_ngcft1.hpp | 32 +++++- 3 files changed, 261 insertions(+), 5 deletions(-) diff --git a/src/plugin_ngcft1.cpp b/src/plugin_ngcft1.cpp index 7d6766a..af2b201 100644 --- a/src/plugin_ngcft1.cpp +++ b/src/plugin_ngcft1.cpp @@ -100,6 +100,7 @@ SOLANA_PLUGIN_EXPORT void solana_plugin_tick(float delta) { //std::cout << "PLUGIN NGCEXT TICK()\n"; g_ngcft1->iterate(delta); + g_sha1_ngcft1->iterate(delta); } } // extern C diff --git a/src/sha1_ngcft1.cpp b/src/sha1_ngcft1.cpp index 36ef2f0..9b4b15b 100644 --- a/src/sha1_ngcft1.cpp +++ b/src/sha1_ngcft1.cpp @@ -26,8 +26,65 @@ namespace Components { std::vector hash; }; + struct FT1ChunkSHA1Cache { + std::vector have_chunk; + bool have_all {false}; + size_t have_count {0}; + entt::dense_map chunk_hash_to_index; + + std::optional chunkIndex(const SHA1Digest& hash) const; + size_t chunkSize(size_t chunk_index) const; + bool haveChunk(const SHA1Digest& hash) const; + }; + } // Components +std::optional Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const { + const auto it = chunk_hash_to_index.find(hash); + if (it != chunk_hash_to_index.cend()) { + return it->second; + } else { + return std::nullopt; + } +} + +bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { + if (have_all) { // short cut + return true; + } + + if (auto i_opt = chunkIndex(hash); i_opt.has_value()) { + return have_chunk[i_opt.value()]; + } + + // not part of this file + return false; +} + +static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { + if (chunk_index+1 == sha1_info.chunks.size()) { + // last chunk + return sha1_info.file_size - chunk_index * sha1_info.chunk_size; + } else { + return sha1_info.chunk_size; + } +} + +void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, Message3Handle msg, const SHA1Digest& hash) { + // TODO: transfers + for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) { + // if already in queue + if (i_g == group_number && i_p == peer_number && i_h == hash) { + // update timer + i_t = 0.f; + return; + } + } + + // not in queue yet + _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, msg, hash, 0.f)); +} + uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t peer_number) { return (uint64_t(group_number) << 32) | peer_number; } @@ -55,6 +112,118 @@ SHA1_NGCFT1::SHA1_NGCFT1( _rmm.subscribe(this, RegistryMessageModel_Event::send_file_path); } +void SHA1_NGCFT1::iterate(float delta) { + { // timers + // chunk sending + for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) { + for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { + it->second.time_since_activity += delta; + + // if we have not heard for 10sec, timeout + if (it->second.time_since_activity >= 10.f) { + //std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n"; + std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << "." << int(it->first) << "\n"; + it = peer_it->second.erase(it); + } else { + it++; + } + } + + if (peer_it->second.empty()) { + // cleanup unused peers too agressive? + peer_it = _sending_transfers.erase(peer_it); + } else { + peer_it++; + } + } + //for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) { + //float& time_since_remove_activity = std::get(*it); + //time_since_remove_activity += delta; + + //// if we have not heard for 10sec, timeout + //if (time_since_remove_activity >= 10.f) { + //std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n"; + //it = _transfers_sending_chunk.erase(it); + //} else { + //it++; + //} + //} + + // queued requests + for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { + float& timer = std::get(*it); + timer += delta; + + if (timer >= 10.f) { + it = _queue_requested_chunk.erase(it); + } else { + it++; + } + } + } + + // if we have not reached the total cap for transfers + // count running transfers + size_t running_transfer_count {0}; + for (const auto& [_, transfers] : _sending_transfers) { + running_transfer_count += transfers.size(); + } + if (running_transfer_count < _max_concurrent_out) { + // for each peer? transfer cap per peer? +#if 0 + // first check requests for info + if (!_queue_requested_info.empty()) { + // send init to _queue_requested_info + const auto [group_number, peer_number] = _queue_requested_info.front(); + + if (_tcl.getGroupPeerConnectionStatus(group_number, peer_number) != TOX_CONNECTION_NONE) { + uint8_t transfer_id {0}; + + if (_tcl.sendFT1InitPrivate( + group_number, peer_number, + NGC_FT1_file_kind::HASH_SHA1_INFO, + _sha1_info_hash.data.data(), _sha1_info_hash.size(), // id (info hash) + _sha1_info_data.size(), // "file_size" + transfer_id + )) { + _transfers_requested_info.push_back({ + group_number, peer_number, + transfer_id, + 0.f + }); + + _queue_requested_info.pop_front(); + } + } + } else +#endif + if (!_queue_requested_chunk.empty()) { // then check for chunk requests + const auto [group_number, peer_number, msg, chunk_hash, _] = _queue_requested_chunk.front(); + + auto chunk_idx_opt = msg.get().chunkIndex(chunk_hash); + if (chunk_idx_opt.has_value()) { + const auto& info = msg.get(); + + uint8_t transfer_id {0}; + if (_nft.NGC_FT1_send_init_private( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), + chunk_hash.data.data(), chunk_hash.size(), + chunkSize(info, chunk_idx_opt.value()), + &transfer_id + )) { + _sending_transfers + [combineIds(group_number, peer_number)] + [transfer_id] // TODO: also save index? + .v = SendingTransfer::Chunk{msg, chunk_idx_opt.value() * info.chunk_size}; + } + } + // remove from queue regardless + _queue_requested_chunk.pop_front(); + } + } +} + bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { // only interested in sha1 if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { @@ -79,6 +248,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { assert(msg.all_of()); + // TODO: queue instead + //queueUpRequestInfo(e.group_number, e.peer_number, info_hash); uint8_t transfer_id {0}; _nft.NGC_FT1_send_init_private( e.group_number, e.peer_number, @@ -92,9 +263,35 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { [combineIds(e.group_number, e.peer_number)] [transfer_id] .v = SendingTransfer::Info{msg.get().data}; + } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { + if (e.file_id_size != 20) { + // error + return false; + } + + SHA1Digest chunk_hash{e.file_id, e.file_id_size}; + + if (!_chunks.count(chunk_hash)) { + // we dont know about this + return false; + } + + auto msg = _chunks.at(chunk_hash); + + assert(msg.all_of()); + + if (!msg.get().haveChunk(chunk_hash)) { + // we dont have the chunk + return false; + } + + // queue good request + queueUpRequestChunk(e.group_number, e.peer_number, msg, chunk_hash); + } else { + assert(false && "unhandled case"); } - return false; + return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { @@ -132,8 +329,28 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { // was last read (probably TODO: add transfer destruction event) peer.erase(e.transfer_id); } + } else if (std::holds_alternative(transfer.v)) { + auto& chunk_transfer = std::get(transfer.v); + const auto data = chunk_transfer.msg.get()->read(chunk_transfer.offset_into_file + e.data_offset, e.data_size); + + // TODO: optimize + for (size_t i = 0; i < e.data_size && i < data.size(); i++) { + e.data[i] = data[i]; + } + + chunk_transfer.msg.get_or_emplace().total += data.size(); + //_rmm.throwEventUpdate(transfer); // should we? + + //if (e.data_offset + e.data_size >= *insert chunk size here*) { + //// was last read (probably TODO: add transfer destruction event) + //peer.erase(e.transfer_id); + //} + } else { + assert(false && "not implemented?"); } + transfer.time_since_activity = 0.f; + return true; } @@ -210,7 +427,19 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std reg_ptr->emplace(e, sha1_info); reg_ptr->emplace(e, sha1_info_data); // keep around? or file? reg_ptr->emplace(e, sha1_info_hash); - _info_to_message[sha1_info_hash] = {*reg_ptr, e}; + { // lookup tables and have + auto& cc = reg_ptr->emplace(e); + cc.have_all = true; + // skip have vec, since all + //cc.have_chunk + cc.have_count = sha1_info.chunks.size(); // need? + + _info_to_message[sha1_info_hash] = {*reg_ptr, e}; + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + _chunks[sha1_info.chunks[i]] = {*reg_ptr, e}; + cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; + } + } //reg_ptr->emplace(e, file_kind); // file id would be sha1_info hash or something diff --git a/src/sha1_ngcft1.hpp b/src/sha1_ngcft1.hpp index 3592c94..dd92b1f 100644 --- a/src/sha1_ngcft1.hpp +++ b/src/sha1_ngcft1.hpp @@ -9,7 +9,8 @@ #include "./ngcft1.hpp" #include "./ft1_sha1_info.hpp" -#include "entt/container/dense_map.hpp" + +#include #include @@ -19,9 +20,19 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { NGCFT1& _nft; ToxContactModel2& _tcm; + // limit this to each group? entt::dense_map _info_to_message; + // sha1 chunk index + // TODO: optimize lookup + entt::dense_map _chunks; + + // group_number, peer_number, message, chunk_hash, timer + std::deque> _queue_requested_chunk; + //void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash); + void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, Message3Handle msg, const SHA1Digest& hash); + struct SendingTransfer { struct Info { // copy of info data @@ -29,13 +40,28 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { std::vector info_data; }; - std::variant v; + struct Chunk { + Message3Handle msg; + uint64_t offset_into_file; + // or data? + // if memmapped, this would be just a pointer + }; + + std::variant v; + + float time_since_activity {0.f}; }; // key is groupid + peerid entt::dense_map> _sending_transfers; static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number); + public: // TODO: config + bool _udp_only {false}; + + size_t _max_concurrent_in {8}; + size_t _max_concurrent_out {4}; + public: SHA1_NGCFT1( Contact3Registry& cr, @@ -44,7 +70,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { ToxContactModel2& tcm ); - //void iterate(float delta); + void iterate(float delta); protected: // events bool onEvent(const Events::NGCFT1_recv_request&) override;