diff --git a/CMakeLists.txt b/CMakeLists.txt index ca80a0c..1ff7b0d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -59,6 +59,8 @@ add_library(solanaceae_sha1_ngcft1 ./solanaceae/ngc_ft1_sha1/components.hpp ./solanaceae/ngc_ft1_sha1/components.cpp + ./solanaceae/ngc_ft1_sha1/contact_components.hpp + ./solanaceae/ngc_ft1_sha1/chunk_picker.hpp ./solanaceae/ngc_ft1_sha1/chunk_picker.cpp diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp index fa9db41..2a2c46d 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp @@ -3,6 +3,7 @@ #include #include "./components.hpp" +#include "./contact_components.hpp" #include @@ -111,6 +112,64 @@ void ChunkPicker::updateParticipation( Contact3Handle c, ObjectRegistry& objreg ) { + if (!c.all_of()) { + participating_unfinished.clear(); + return; + } + + entt::dense_set checked; + for (const Object ov : c.get().participating) { + const ObjectHandle o {objreg, ov}; + + if (participating_unfinished.contains(o)) { + if (!o.all_of()) { + participating_unfinished.erase(o); + continue; + } + + if (o.all_of()) { + participating_unfinished.erase(o); + continue; + } + + if (o.get().have_all) { + participating_unfinished.erase(o); + } + } else { + if (!o.all_of()) { + continue; + } + + if (o.all_of()) { + continue; + } + + if (!o.get().have_all) { + participating_unfinished.emplace(o, ParticipationEntry{}); + } + } + checked.emplace(o); + } + + // now we still need to remove left over unfinished. + // TODO: how did they get left over + entt::dense_set to_remove; + for (const auto& [o, _] : participating_unfinished) { + if (!checked.contains(o)) { + std::cerr << "unfinished contained non participating\n"; + to_remove.emplace(o); + } + } + for (const auto& o : to_remove) { + participating_unfinished.erase(o); + } +} + +void ChunkPicker::validateParticipation( + Contact3Handle c, + ObjectRegistry& objreg +) { +#if 0 // replaces them in place participating.clear(); participating_unfinished.clear(); @@ -128,6 +187,40 @@ void ChunkPicker::updateParticipation( participating_unfinished.emplace(o, ParticipationEntry{}); } } +#endif + entt::dense_set val_p; + //entt::dense_set val_pu; + //participating_unfinished.clear(); // HACK: rn this update unfished o.o + + for (const Object ov : objreg.view()) { + const ObjectHandle o {objreg, ov}; + + val_p.emplace(o); + + //if (!o.all_of()) { + // continue; + //} + + //if (!o.get().have_all) { + // //val_pu.emplace(o); + // //participating_unfinished.emplace(o, ParticipationEntry{}); + //} + } + + // validate + if (c.all_of()) { + const auto& participating = c.get().participating; + assert(val_p.size() == participating.size()); + //assert(val_pu.size() == participating_unfinished.size()); + + for (const auto& it : val_p) { + assert(participating.contains(it)); + } + } + + //for (const auto& it : val_pu) { + // assert(participating_unfinished.contains(it)); + //} } std::vector ChunkPicker::updateChunkRequests( @@ -146,6 +239,8 @@ std::vector ChunkPicker::updateChunkRequests( } const auto [group_number, peer_number] = c.get(); + updateParticipation(c, objreg); + if (participating_unfinished.empty()) { participating_in_last = entt::null; return {}; diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp index e171eb9..9854ac3 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.hpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.hpp @@ -19,6 +19,13 @@ // goal is to always keep 2 transfers running and X(6) requests queued up // per peer +struct ChunkPickerUpdateTag {}; + +struct ChunkPickerTimer { + // adds update tag on 0 + float timer {0.f}; +}; + // contact component? struct ChunkPicker { // max transfers @@ -41,13 +48,20 @@ struct ChunkPicker { }; // TODO: only unfinished? entt::dense_map participating_unfinished; - entt::dense_set participating; Object participating_in_last {entt::null}; + private: + // updates participating_unfinished void updateParticipation( Contact3Handle c, ObjectRegistry& objreg ); + public: + + void validateParticipation( + Contact3Handle c, + ObjectRegistry& objreg + ); // tick //void sendInfoRequests(); diff --git a/solanaceae/ngc_ft1_sha1/contact_components.hpp b/solanaceae/ngc_ft1_sha1/contact_components.hpp new file mode 100644 index 0000000..2c69e4c --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/contact_components.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include +#include + +namespace Contact::Components { + +struct FT1Participation { + entt::dense_set participating; +}; + +} // Contact::Components + diff --git a/solanaceae/ngc_ft1_sha1/participation.cpp b/solanaceae/ngc_ft1_sha1/participation.cpp index 8cf6213..0e1295d 100644 --- a/solanaceae/ngc_ft1_sha1/participation.cpp +++ b/solanaceae/ngc_ft1_sha1/participation.cpp @@ -1,9 +1,14 @@ #include "./participation.hpp" +#include "./contact_components.hpp" #include "./chunk_picker.hpp" +#include + bool addParticipation(Contact3Handle c, ObjectHandle o) { bool was_new {false}; + assert(static_cast(o)); + assert(static_cast(c)); if (static_cast(o)) { const auto [_, inserted] = o.get_or_emplace().participants.emplace(c); @@ -11,24 +16,33 @@ bool addParticipation(Contact3Handle c, ObjectHandle o) { } if (static_cast(c)) { - const auto [_, inserted] = c.get_or_emplace().participating.emplace(o); + const auto [_, inserted] = c.get_or_emplace().participating.emplace(o); was_new = was_new || inserted; - - // TODO: if not have_all - c.get_or_emplace().participating_unfinished.emplace(o, ChunkPicker::ParticipationEntry{}); } + std::cout << "added " << (was_new?"new ":"") << "participant\n"; + return was_new; } void removeParticipation(Contact3Handle c, ObjectHandle o) { + assert(static_cast(o)); + assert(static_cast(c)); + if (static_cast(o) && o.all_of()) { o.get().participants.erase(c); } - if (static_cast(c) && c.all_of()) { - c.get().participating.erase(o); - c.get().participating_unfinished.erase(o); + if (static_cast(c)) { + if (c.all_of()) { + c.get().participating.erase(o); + } + + if (c.all_of()) { + c.get().participating_unfinished.erase(o); + } } + + std::cout << "removed participant\n"; } diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index 160c392..379f237 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -19,6 +19,7 @@ #include "./file_rw_mapped.hpp" #include "./components.hpp" +#include "./contact_components.hpp" #include "./chunk_picker.hpp" #include "./participation.hpp" @@ -217,7 +218,7 @@ SHA1_NGCFT1::SHA1_NGCFT1( } float SHA1_NGCFT1::iterate(float delta) { - std::cerr << "---------- new tick ----------\n"; + //std::cerr << "---------- new tick ----------\n"; // info builder queue if (_info_builder_dirty) { std::lock_guard l{_info_builder_queue_mutex}; @@ -392,26 +393,59 @@ float SHA1_NGCFT1::iterate(float delta) { std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; } } + } - // new chunk picker code + // ran regardless of _max_concurrent_in + { // new chunk picker code // HACK: work around missing contact events + std::vector c_offline_to_remove; std::vector cp_to_remove; - _cr.view().each([this, &_peer_open_requests, &cp_to_remove](const Contact3 cv, ChunkPicker& cp) { + + // first, update timers + _cr.view().each([this, delta](const Contact3 cv, ChunkPickerTimer& cpt) { + cpt.timer -= delta; + if (cpt.timer <= 0.f) { + _cr.emplace_or_replace(cv); + } + }); + + //std::cout << "number of chunkpickers: " << _cr.storage().size() << ", of which " << _cr.storage().size() << " need updating\n"; + + // now check for potentially missing cp + auto cput_view = _cr.view(); + cput_view.each([this, &c_offline_to_remove](const Contact3 cv) { Contact3Handle c{_cr, cv}; - if (!c.all_of()) { - cp_to_remove.push_back(c); + //std::cout << "cput :)\n"; + + if (!c.any_of()) { + std::cout << "cput uh nuh :(\n"; + c_offline_to_remove.push_back(c); return; } + if (!c.all_of()) { + std::cout << "creating new cp!!\n"; + c.emplace(); + c.emplace_or_replace(); + } + }); + + // now update all cp that are tagged + _cr.view().each([this, &_peer_open_requests, &cp_to_remove, &c_offline_to_remove](const Contact3 cv, ChunkPicker& cp) { + Contact3Handle c{_cr, cv}; + + if (!c.any_of()) { + //cp_to_remove.push_back(c); + c_offline_to_remove.push_back(c); + return; + } + + //std::cout << "cpu :)\n"; + // HACK: expensive, dont do every tick, only on events // do verification in debug instead? - cp.updateParticipation( - c, - _os.registry() - ); - - assert(!cp.participating.empty()); + //cp.validateParticipation(c, _os.registry()); size_t peer_open_request = 0; if (_peer_open_requests.contains(c)) { @@ -426,6 +460,15 @@ float SHA1_NGCFT1::iterate(float delta) { ); if (new_requests.empty()) { + // updateChunkRequests updates the unfinished + // TODO: pull out and check there? + if (cp.participating_unfinished.empty()) { + std::cout << "destorying empty useless cp\n"; + cp_to_remove.push_back(c); + } else { + c.get_or_emplace().timer = 60.f; + } + return; } @@ -444,10 +487,21 @@ float SHA1_NGCFT1::iterate(float delta) { ); std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n"; } + + // force update every minute + // TODO: add small random bias to spread load + c.get_or_emplace().timer = 60.f; }); + // unmark all marked + _cr.clear(); + assert(_cr.storage().empty()); + for (const auto& c : cp_to_remove) { - c.remove(); + c.remove(); + } + for (const auto& c : c_offline_to_remove) { + c.remove(); for (const auto& [_, o] : _info_to_content) { removeParticipation(c, o); @@ -557,11 +611,6 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } } - - if (!cc.have_all) { - // now, enque - _queue_content_want_chunk.push_back(ce); - } } ce.emplace(std::move(file_impl)); @@ -596,6 +645,16 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { ce.remove(); + // start requesting from all participants + if (ce.all_of()) { + std::cout << "accepted ft has " << ce.get().participants.size() << " sp\n"; + for (const auto cv : ce.get().participants) { + _cr.emplace_or_replace(cv); + } + } else { + std::cout << "accepted ft has NO sp!\n"; + } + // should? e.e.remove(); @@ -663,7 +722,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { { // they advertise interest in the content const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); - addParticipation(c, o); + if (addParticipation(c, o)) { + // something happend, update chunk picker + assert(static_cast(c)); + c.emplace_or_replace(); + } } assert(o.all_of()); @@ -729,7 +792,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { { // they have the content (probably, might be fake, should move this to done) const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); - addParticipation(c, o); + if (addParticipation(c, o)) { + // something happend, update chunk picker + assert(static_cast(c)); + c.emplace_or_replace(); + } } assert(o.all_of()); @@ -1007,6 +1074,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { } else { std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; } + + // something happend, update chunk picker + auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + assert(static_cast(c)); + c.emplace_or_replace(); } else { // bad chunk std::cout << "SHA1_NGCFT1: got BAD chunk from " << e.group_number << ":" << e.peer_number << " [" << info.chunks.at(chunk_index) << "] ; instead got [" << SHA1Digest{got_hash} << "]\n"; @@ -1014,6 +1086,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { // remove from requested // TODO: remove at init and track running transfers differently + // should be done, double check later for (const auto it : transfer.getChunk().chunk_indices) { o.get_or_emplace().chunks.erase(it); } @@ -1132,7 +1205,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { reg_ptr->emplace(new_msg_e, ce); // HACK: assume the message sender is participating. usually a safe bet. - addParticipation(c, ce); + if (addParticipation(c, ce)) { + // something happend, update chunk picker + assert(static_cast(c)); + c.emplace_or_replace(); + } // HACK: assume the message sender has all ce.get_or_emplace().others[c] = {true, {}}; @@ -1322,15 +1399,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std } // TODO: we dont want chunks anymore - // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) - auto it = self->_queue_content_want_chunk.begin(); - while ( - it != self->_queue_content_want_chunk.end() && - (it = std::find(it, self->_queue_content_want_chunk.end(), ce)) != self->_queue_content_want_chunk.end() - ) { - it = self->_queue_content_want_chunk.erase(it); - } } else { // TODO: backend ce = {self->_os.registry(), self->_os.registry().create()}; @@ -1368,6 +1437,15 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std ce.emplace(0u); } + // something happend, update all chunk pickers + if (ce.all_of()) { + for (const auto& pcv : ce.get().participants) { + Contact3Handle pch{self->_cr, pcv}; + assert(static_cast(pch)); + pch.emplace_or_replace(); + } + } + const auto c_self = self->_cr.get(c).self; if (!self->_cr.valid(c_self)) { std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; @@ -1450,7 +1528,7 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { return false; } - c.remove(); + c.remove(); for (const auto& [_, o] : _info_to_content) { removeParticipation(c, o); @@ -1503,7 +1581,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) { const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); // we might not know yet - addParticipation(c, o); + if (addParticipation(c, o)) { + // something happend, update chunk picker + assert(static_cast(c)); + c.emplace_or_replace(); + } auto& remote_have = o.get_or_emplace().others; if (!remote_have.contains(c)) { @@ -1583,7 +1665,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) { const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); // we might not know yet - addParticipation(c, o); + if (addParticipation(c, o)) { + // something happend, update chunk picker + assert(static_cast(c)); + c.emplace_or_replace(); + } auto& remote_have = o.get_or_emplace().others; if (!remote_have.contains(c)) { @@ -1646,11 +1732,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) { return false; } - // add them to participants + // add to participants const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); auto o = itc_it->second; - const bool was_new = addParticipation(c, o); - if (was_new) { + if (addParticipation(c, o)) { + // something happend, update chunk picker + assert(static_cast(c)); + c.emplace_or_replace(); + std::cout << "SHA1_NGCFT1: and we where interested!\n"; // we should probably send the bitset back here / add to queue (can be multiple packets) } diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index ebde0fb..bd94b9f 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -73,7 +73,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public // makes request rotate around open content std::deque _queue_content_want_info; - std::deque _queue_content_want_chunk; std::atomic_bool _info_builder_dirty {false}; std::mutex _info_builder_queue_mutex;