diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index adc8cc8..2893c42 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -176,6 +176,45 @@ std::optional> SHA1_NGCFT1::selectPeerForRequest(O return std::nullopt; } +void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) { + if (!static_cast(c) || !static_cast(o)) { + assert(false); + return; + } + + // TODO: only queue if not already sent?? + + + if (!o.all_of()) { + return; + } + + const auto& cc = o.get(); + const auto& info = o.get(); + const auto total_chunks = info.chunks.size(); + + static constexpr size_t bits_per_packet {8u*512u}; + + for (size_t i = 0; i < total_chunks; i += bits_per_packet) { + size_t bits_this_packet = std::min(bits_per_packet, total_chunks-i); + + BitSet have(bits_this_packet); // default init to zero + if (cc.have_all) { + // TODO: send have all packet instead + have.invert(); // we "have_all" + } else { + // TODO: optimize selective copy bitset + for (size_t j = i; j < i+bits_this_packet; j++) { + if (cc.have_chunk[j]) { + have.set(j-i); + } + } + } + + _queue_send_bitset.push_back(QBitsetEntry{c, o, i, have}); + } +} + SHA1_NGCFT1::SHA1_NGCFT1( ObjectStore2& os, Contact3Registry& cr, @@ -308,6 +347,30 @@ float SHA1_NGCFT1::iterate(float delta) { } } + { // send out bitsets + // currently 1 per tick + if (!_queue_send_bitset.empty()) { + const auto& qe = _queue_send_bitset.front(); + + // TODO: build bitset inplace instead, to not miss any chunks arrived this tick + if (qe.c.all_of() && qe.o.all_of()) { + const auto [group_number, peer_number] = qe.c.get(); + const auto& info_hash = qe.o.get().hash; + + // TODO: only pop if sent? + _neep.send_ft1_bitset( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), + info_hash.data(), info_hash.size(), + qe.start_index, + qe.have._bytes.data(), qe.have.size_bytes() + ); + } + + _queue_send_bitset.pop_front(); + } + } + // if we have not reached the total cap for transfers // count running transfers size_t running_sending_transfer_count {0}; @@ -1182,22 +1245,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { //ce.emplace(sha1_info); //ce.emplace(sha1_info_data); // keep around? or file? ce.emplace(sha1_info_hash); - //{ // lookup tables and have - //auto& cc = ce.emplace(); - //cc.have_all = true; - //// skip have vec, since all - ////cc.have_chunk - //cc.have_count = sha1_info.chunks.size(); // need? - - //_info_to_content[sha1_info_hash] = ce; - //for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - //_chunks[sha1_info.chunks[i]] = ce; - //cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; - //} - //} - - // TODO: ft1 specific comp - //ce.emplace(std::move(file_impl)); } ce.get_or_emplace().messages.push_back({reg, new_msg_e}); reg_ptr->emplace(new_msg_e, ce); @@ -1638,7 +1685,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) { } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) { - std::cerr << "SHA1_NGCFT1: FT1_BITSET o:" << e.start_chunk << " s:" << e.chunk_bitset.size() << "\n"; + std::cerr << "SHA1_NGCFT1: got FT1_BITSET o:" << e.start_chunk << " s:" << e.chunk_bitset.size()*8 << "\n"; if (e.file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; @@ -1665,9 +1712,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) { } const size_t num_total_chunks = o.get().chunks.size(); - // +1 for byte rounding - if (num_total_chunks+1 < e.start_chunk + (e.chunk_bitset.size()*8)) { + // +7 for byte rounding + if (num_total_chunks+7 < e.start_chunk + (e.chunk_bitset.size()*8)) { std::cerr << "SHA1_NGCFT1 error: got bitset.size+start that is larger then number of chunks!!\n"; + std::cerr << "total:" << num_total_chunks << " start:" << e.start_chunk << " size:" << e.chunk_bitset.size()*8 << "\n"; return false; } @@ -1750,11 +1798,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) { auto o = itc_it->second; if (addParticipation(c, o)) { // something happend, update chunk picker + // !!! this is probably too much assert(static_cast(c)); c.emplace_or_replace(); std::cout << "SHA1_NGCFT1: and we where interested!\n"; // we should probably send the bitset back here / add to queue (can be multiple packets) + if (o.all_of() && o.get().have_count > 0) { + queueBitsetSendFull(c, o); + } } return false; diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index de46ee1..8b8ba47 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -74,6 +75,15 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public // makes request rotate around open content std::deque _queue_content_want_info; + struct QBitsetEntry { + Contact3Handle c; + ObjectHandle o; + size_t start_index; + //size_t size; + BitSet have; + }; + std::deque _queue_send_bitset; + // workaround missing contact events // only used to remove participation on peer exit entt::dense_map _tox_peer_to_contact; @@ -87,6 +97,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public std::optional> selectPeerForRequest(ObjectHandle ce); + void queueBitsetSendFull(Contact3Handle c, ObjectHandle o); + public: // TODO: config bool _udp_only {false};