rework chunk picker update logic and participation logic

disable most spammy log
This commit is contained in:
Green Sky 2024-07-08 18:12:47 +02:00
parent 79e3070422
commit e5681b4ad5
No known key found for this signature in database
7 changed files with 269 additions and 43 deletions

View File

@ -59,6 +59,8 @@ add_library(solanaceae_sha1_ngcft1
./solanaceae/ngc_ft1_sha1/components.hpp
./solanaceae/ngc_ft1_sha1/components.cpp
./solanaceae/ngc_ft1_sha1/contact_components.hpp
./solanaceae/ngc_ft1_sha1/chunk_picker.hpp
./solanaceae/ngc_ft1_sha1/chunk_picker.cpp

View File

@ -3,6 +3,7 @@
#include <solanaceae/tox_contacts/components.hpp>
#include "./components.hpp"
#include "./contact_components.hpp"
#include <algorithm>
@ -111,6 +112,64 @@ void ChunkPicker::updateParticipation(
Contact3Handle c,
ObjectRegistry& objreg
) {
if (!c.all_of<Contact::Components::FT1Participation>()) {
participating_unfinished.clear();
return;
}
entt::dense_set<Object> checked;
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
const ObjectHandle o {objreg, ov};
if (participating_unfinished.contains(o)) {
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
participating_unfinished.erase(o);
continue;
}
if (o.all_of<Message::Components::Transfer::TagPaused>()) {
participating_unfinished.erase(o);
continue;
}
if (o.get<Components::FT1ChunkSHA1Cache>().have_all) {
participating_unfinished.erase(o);
}
} else {
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
continue;
}
if (o.all_of<Message::Components::Transfer::TagPaused>()) {
continue;
}
if (!o.get<Components::FT1ChunkSHA1Cache>().have_all) {
participating_unfinished.emplace(o, ParticipationEntry{});
}
}
checked.emplace(o);
}
// now we still need to remove left over unfinished.
// TODO: how did they get left over
entt::dense_set<Object> to_remove;
for (const auto& [o, _] : participating_unfinished) {
if (!checked.contains(o)) {
std::cerr << "unfinished contained non participating\n";
to_remove.emplace(o);
}
}
for (const auto& o : to_remove) {
participating_unfinished.erase(o);
}
}
void ChunkPicker::validateParticipation(
Contact3Handle c,
ObjectRegistry& objreg
) {
#if 0
// replaces them in place
participating.clear();
participating_unfinished.clear();
@ -128,6 +187,40 @@ void ChunkPicker::updateParticipation(
participating_unfinished.emplace(o, ParticipationEntry{});
}
}
#endif
entt::dense_set<Object> val_p;
//entt::dense_set<Object> val_pu;
//participating_unfinished.clear(); // HACK: rn this update unfished o.o
for (const Object ov : objreg.view<Components::SuspectedParticipants>()) {
const ObjectHandle o {objreg, ov};
val_p.emplace(o);
//if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
// continue;
//}
//if (!o.get<Components::FT1ChunkSHA1Cache>().have_all) {
// //val_pu.emplace(o);
// //participating_unfinished.emplace(o, ParticipationEntry{});
//}
}
// validate
if (c.all_of<Contact::Components::FT1Participation>()) {
const auto& participating = c.get<Contact::Components::FT1Participation>().participating;
assert(val_p.size() == participating.size());
//assert(val_pu.size() == participating_unfinished.size());
for (const auto& it : val_p) {
assert(participating.contains(it));
}
}
//for (const auto& it : val_pu) {
// assert(participating_unfinished.contains(it));
//}
}
std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
@ -146,6 +239,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
}
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
updateParticipation(c, objreg);
if (participating_unfinished.empty()) {
participating_in_last = entt::null;
return {};

View File

@ -19,6 +19,13 @@
// goal is to always keep 2 transfers running and X(6) requests queued up
// per peer
struct ChunkPickerUpdateTag {};
struct ChunkPickerTimer {
// adds update tag on 0
float timer {0.f};
};
// contact component?
struct ChunkPicker {
// max transfers
@ -41,13 +48,20 @@ struct ChunkPicker {
};
// TODO: only unfinished?
entt::dense_map<Object, ParticipationEntry> participating_unfinished;
entt::dense_set<Object> participating;
Object participating_in_last {entt::null};
private:
// updates participating_unfinished
void updateParticipation(
Contact3Handle c,
ObjectRegistry& objreg
);
public:
void validateParticipation(
Contact3Handle c,
ObjectRegistry& objreg
);
// tick
//void sendInfoRequests();

View File

@ -0,0 +1,13 @@
#pragma once
#include <solanaceae/object_store/object_store.hpp>
#include <entt/container/dense_set.hpp>
namespace Contact::Components {
struct FT1Participation {
entt::dense_set<Object> participating;
};
} // Contact::Components

View File

@ -1,9 +1,14 @@
#include "./participation.hpp"
#include "./contact_components.hpp"
#include "./chunk_picker.hpp"
#include <iostream>
bool addParticipation(Contact3Handle c, ObjectHandle o) {
bool was_new {false};
assert(static_cast<bool>(o));
assert(static_cast<bool>(c));
if (static_cast<bool>(o)) {
const auto [_, inserted] = o.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
@ -11,24 +16,33 @@ bool addParticipation(Contact3Handle c, ObjectHandle o) {
}
if (static_cast<bool>(c)) {
const auto [_, inserted] = c.get_or_emplace<ChunkPicker>().participating.emplace(o);
const auto [_, inserted] = c.get_or_emplace<Contact::Components::FT1Participation>().participating.emplace(o);
was_new = was_new || inserted;
// TODO: if not have_all
c.get_or_emplace<ChunkPicker>().participating_unfinished.emplace(o, ChunkPicker::ParticipationEntry{});
}
std::cout << "added " << (was_new?"new ":"") << "participant\n";
return was_new;
}
void removeParticipation(Contact3Handle c, ObjectHandle o) {
assert(static_cast<bool>(o));
assert(static_cast<bool>(c));
if (static_cast<bool>(o) && o.all_of<Components::SuspectedParticipants>()) {
o.get<Components::SuspectedParticipants>().participants.erase(c);
}
if (static_cast<bool>(c) && c.all_of<ChunkPicker>()) {
c.get<ChunkPicker>().participating.erase(o);
c.get<ChunkPicker>().participating_unfinished.erase(o);
if (static_cast<bool>(c)) {
if (c.all_of<Contact::Components::FT1Participation>()) {
c.get<Contact::Components::FT1Participation>().participating.erase(o);
}
if (c.all_of<ChunkPicker>()) {
c.get<ChunkPicker>().participating_unfinished.erase(o);
}
}
std::cout << "removed participant\n";
}

View File

@ -19,6 +19,7 @@
#include "./file_rw_mapped.hpp"
#include "./components.hpp"
#include "./contact_components.hpp"
#include "./chunk_picker.hpp"
#include "./participation.hpp"
@ -217,7 +218,7 @@ SHA1_NGCFT1::SHA1_NGCFT1(
}
float SHA1_NGCFT1::iterate(float delta) {
std::cerr << "---------- new tick ----------\n";
//std::cerr << "---------- new tick ----------\n";
// info builder queue
if (_info_builder_dirty) {
std::lock_guard l{_info_builder_queue_mutex};
@ -392,26 +393,59 @@ float SHA1_NGCFT1::iterate(float delta) {
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
}
}
}
// new chunk picker code
// ran regardless of _max_concurrent_in
{ // new chunk picker code
// HACK: work around missing contact events
std::vector<Contact3Handle> c_offline_to_remove;
std::vector<Contact3Handle> cp_to_remove;
_cr.view<ChunkPicker>().each([this, &_peer_open_requests, &cp_to_remove](const Contact3 cv, ChunkPicker& cp) {
// first, update timers
_cr.view<ChunkPickerTimer>().each([this, delta](const Contact3 cv, ChunkPickerTimer& cpt) {
cpt.timer -= delta;
if (cpt.timer <= 0.f) {
_cr.emplace_or_replace<ChunkPickerUpdateTag>(cv);
}
});
//std::cout << "number of chunkpickers: " << _cr.storage<ChunkPicker>().size() << ", of which " << _cr.storage<ChunkPickerUpdateTag>().size() << " need updating\n";
// now check for potentially missing cp
auto cput_view = _cr.view<ChunkPickerUpdateTag>();
cput_view.each([this, &c_offline_to_remove](const Contact3 cv) {
Contact3Handle c{_cr, cv};
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
cp_to_remove.push_back(c);
//std::cout << "cput :)\n";
if (!c.any_of<Contact::Components::ToxGroupPeerEphemeral, Contact::Components::FT1Participation>()) {
std::cout << "cput uh nuh :(\n";
c_offline_to_remove.push_back(c);
return;
}
if (!c.all_of<ChunkPicker>()) {
std::cout << "creating new cp!!\n";
c.emplace<ChunkPicker>();
c.emplace_or_replace<ChunkPickerTimer>();
}
});
// now update all cp that are tagged
_cr.view<ChunkPicker, ChunkPickerUpdateTag>().each([this, &_peer_open_requests, &cp_to_remove, &c_offline_to_remove](const Contact3 cv, ChunkPicker& cp) {
Contact3Handle c{_cr, cv};
if (!c.any_of<Contact::Components::ToxGroupPeerEphemeral, Contact::Components::FT1Participation>()) {
//cp_to_remove.push_back(c);
c_offline_to_remove.push_back(c);
return;
}
//std::cout << "cpu :)\n";
// HACK: expensive, dont do every tick, only on events
// do verification in debug instead?
cp.updateParticipation(
c,
_os.registry()
);
assert(!cp.participating.empty());
//cp.validateParticipation(c, _os.registry());
size_t peer_open_request = 0;
if (_peer_open_requests.contains(c)) {
@ -426,6 +460,15 @@ float SHA1_NGCFT1::iterate(float delta) {
);
if (new_requests.empty()) {
// updateChunkRequests updates the unfinished
// TODO: pull out and check there?
if (cp.participating_unfinished.empty()) {
std::cout << "destorying empty useless cp\n";
cp_to_remove.push_back(c);
} else {
c.get_or_emplace<ChunkPickerTimer>().timer = 60.f;
}
return;
}
@ -444,10 +487,21 @@ float SHA1_NGCFT1::iterate(float delta) {
);
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n";
}
// force update every minute
// TODO: add small random bias to spread load
c.get_or_emplace<ChunkPickerTimer>().timer = 60.f;
});
// unmark all marked
_cr.clear<ChunkPickerUpdateTag>();
assert(_cr.storage<ChunkPickerUpdateTag>().empty());
for (const auto& c : cp_to_remove) {
c.remove<ChunkPicker>();
c.remove<ChunkPicker, ChunkPickerTimer>();
}
for (const auto& c : c_offline_to_remove) {
c.remove<ChunkPicker, ChunkPickerTimer>();
for (const auto& [_, o] : _info_to_content) {
removeParticipation(c, o);
@ -557,11 +611,6 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
}
}
if (!cc.have_all) {
// now, enque
_queue_content_want_chunk.push_back(ce);
}
}
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
@ -596,6 +645,16 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
ce.remove<Message::Components::Transfer::TagPaused>();
// start requesting from all participants
if (ce.all_of<Components::SuspectedParticipants>()) {
std::cout << "accepted ft has " << ce.get<Components::SuspectedParticipants>().participants.size() << " sp\n";
for (const auto cv : ce.get<Components::SuspectedParticipants>().participants) {
_cr.emplace_or_replace<ChunkPickerUpdateTag>(cv);
}
} else {
std::cout << "accepted ft has NO sp!\n";
}
// should?
e.e.remove<Message::Components::Transfer::ActionAccept>();
@ -663,7 +722,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
{ // they advertise interest in the content
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
addParticipation(c, o);
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
}
assert(o.all_of<Components::FT1ChunkSHA1Cache>());
@ -729,7 +792,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
{ // they have the content (probably, might be fake, should move this to done)
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
addParticipation(c, o);
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
}
assert(o.all_of<Components::FT1InfoSHA1>());
@ -1007,6 +1074,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
} else {
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
}
// something happend, update chunk picker
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
} else {
// bad chunk
std::cout << "SHA1_NGCFT1: got BAD chunk from " << e.group_number << ":" << e.peer_number << " [" << info.chunks.at(chunk_index) << "] ; instead got [" << SHA1Digest{got_hash} << "]\n";
@ -1014,6 +1086,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
// remove from requested
// TODO: remove at init and track running transfers differently
// should be done, double check later
for (const auto it : transfer.getChunk().chunk_indices) {
o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
}
@ -1132,7 +1205,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
reg_ptr->emplace<Message::Components::Content>(new_msg_e, ce);
// HACK: assume the message sender is participating. usually a safe bet.
addParticipation(c, ce);
if (addParticipation(c, ce)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
// HACK: assume the message sender has all
ce.get_or_emplace<Components::RemoteHave>().others[c] = {true, {}};
@ -1322,15 +1399,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
}
// TODO: we dont want chunks anymore
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
auto it = self->_queue_content_want_chunk.begin();
while (
it != self->_queue_content_want_chunk.end() &&
(it = std::find(it, self->_queue_content_want_chunk.end(), ce)) != self->_queue_content_want_chunk.end()
) {
it = self->_queue_content_want_chunk.erase(it);
}
} else {
// TODO: backend
ce = {self->_os.registry(), self->_os.registry().create()};
@ -1368,6 +1437,15 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
ce.emplace<Message::Components::Transfer::BytesSent>(0u);
}
// something happend, update all chunk pickers
if (ce.all_of<Components::SuspectedParticipants>()) {
for (const auto& pcv : ce.get<Components::SuspectedParticipants>().participants) {
Contact3Handle pch{self->_cr, pcv};
assert(static_cast<bool>(pch));
pch.emplace_or_replace<ChunkPickerUpdateTag>();
}
}
const auto c_self = self->_cr.get<Contact::Components::Self>(c).self;
if (!self->_cr.valid(c_self)) {
std::cerr << "SHA1_NGCFT1 error: failed to get self!\n";
@ -1450,7 +1528,7 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
return false;
}
c.remove<ChunkPicker>();
c.remove<ChunkPicker, ChunkPickerUpdateTag, ChunkPickerTimer>();
for (const auto& [_, o] : _info_to_content) {
removeParticipation(c, o);
@ -1503,7 +1581,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
// we might not know yet
addParticipation(c, o);
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others;
if (!remote_have.contains(c)) {
@ -1583,7 +1665,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
// we might not know yet
addParticipation(c, o);
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others;
if (!remote_have.contains(c)) {
@ -1646,11 +1732,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) {
return false;
}
// add them to participants
// add to participants
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
auto o = itc_it->second;
const bool was_new = addParticipation(c, o);
if (was_new) {
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
std::cout << "SHA1_NGCFT1: and we where interested!\n";
// we should probably send the bitset back here / add to queue (can be multiple packets)
}

View File

@ -73,7 +73,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
// makes request rotate around open content
std::deque<ObjectHandle> _queue_content_want_info;
std::deque<ObjectHandle> _queue_content_want_chunk;
std::atomic_bool _info_builder_dirty {false};
std::mutex _info_builder_queue_mutex;