big ft fixes, mostly for info, but also other stuff
This commit is contained in:
parent
6ad2905e07
commit
60b3d5d941
@ -49,7 +49,8 @@ void ReAnnounceTimer::reset(void) {
|
||||
|
||||
void ReAnnounceTimer::lower(void) {
|
||||
timer *= 0.1f;
|
||||
last_max *= 0.1f;
|
||||
//last_max *= 0.1f; // is this a good idea?
|
||||
last_max *= 0.9f; // is this a good idea?
|
||||
}
|
||||
|
||||
void TransferStatsTally::Peer::trimSent(const float time_now) {
|
||||
|
@ -18,6 +18,12 @@ void re_announce(
|
||||
std::vector<Object> to_remove;
|
||||
os_reg.view<Components::ReAnnounceTimer>().each([&os_reg, &cr, &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) {
|
||||
ObjectHandle o{os_reg, ov};
|
||||
// if no known targets, or no hash, remove
|
||||
if (!o.all_of<Components::AnnounceTargets, Components::FT1InfoSHA1Hash>()) {
|
||||
to_remove.push_back(ov);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: pause
|
||||
//// if paused -> remove
|
||||
//if (o.all_of<Message::Components::Transfer::TagPaused>()) {
|
||||
@ -25,11 +31,11 @@ void re_announce(
|
||||
// return;
|
||||
//}
|
||||
|
||||
// if not downloading or info incomplete -> remove
|
||||
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
|
||||
// // if not downloading or info incomplete -> remove
|
||||
//if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
|
||||
// if not downloading AND info complete -> remove
|
||||
if (!o.all_of<Components::FT1ChunkSHA1Cache>() && o.all_of<Components::FT1InfoSHA1Data>()) {
|
||||
to_remove.push_back(ov);
|
||||
// TODO: triggers with hs, figure out why
|
||||
//assert(false && "transfer in broken state");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -69,6 +69,12 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
|
||||
assert(o.all_of<Components::Messages>());
|
||||
|
||||
for (auto msg : o.get<Components::Messages>().messages) {
|
||||
// FIXME: hs might create and destory messages for objects without events
|
||||
// we should really do garbage collection
|
||||
if (!msg) {
|
||||
continue;
|
||||
}
|
||||
|
||||
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
|
||||
|
||||
// messages no long hold this info
|
||||
@ -82,33 +88,20 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
|
||||
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) {
|
||||
// get a list of peers we can request this file from
|
||||
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
|
||||
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
|
||||
// TODO: sort by con state?
|
||||
// prio to direct?
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
|
||||
// 1 in 20 chance to ask random peer instead
|
||||
// also works well for empty SuspectedParticipants
|
||||
if ((_rng()%20) == 0) {
|
||||
tox_peers.clear();
|
||||
// or messages? should be the same
|
||||
if (!ce.all_of<Components::AnnounceTargets>()) {
|
||||
// rip
|
||||
std::cerr << "SHA1_NGCFT1 warning: tried random, but no AnnounceTargets\n";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
|
||||
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
|
||||
}
|
||||
}
|
||||
|
||||
// 1 in 40 chance to ask random peer instead
|
||||
// TODO: config + tweak
|
||||
// TODO: save group in content to avoid the tox_peers list build
|
||||
// TODO: remove once pc1_announce is shipped
|
||||
if (tox_peers.empty() || (_rng()%40) == 0) {
|
||||
// meh
|
||||
// HACK: determain group based on last tox_peers
|
||||
if (!tox_peers.empty()) {
|
||||
const uint32_t group_number = tox_peers.back().first;
|
||||
auto gch = _tcm.getContactGroup(group_number);
|
||||
assert(static_cast<bool>(gch));
|
||||
|
||||
std::vector<uint32_t> un_tox_peers;
|
||||
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
|
||||
for (const auto& target : ce.get<Components::AnnounceTargets>().targets) {
|
||||
for (const auto child : _cr.get<Contact::Components::ParentOf>(target).subs) {
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
}
|
||||
@ -119,26 +112,39 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(O
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
|
||||
un_tox_peers.push_back(tgpe.peer_number);
|
||||
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
|
||||
}
|
||||
}
|
||||
if (un_tox_peers.empty()) {
|
||||
// no one online, we are out of luck
|
||||
} else {
|
||||
const size_t sample_i = _rng()%un_tox_peers.size();
|
||||
const auto peer_number = un_tox_peers.at(sample_i);
|
||||
}
|
||||
std::cout << "SHA1_NGCFT1: doing random peer select over " << tox_peers.size() << " peers\n";
|
||||
} else if (ce.all_of<Components::SuspectedParticipants>()) {
|
||||
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
|
||||
// TODO: sort by con state?
|
||||
// prio to direct?
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
return std::make_pair(group_number, peer_number);
|
||||
if (_cr.all_of<Contact::Components::TagSelfStrong>(c)) {
|
||||
// FIXME: how did we select ourselfs to be a suspected participant
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
|
||||
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const size_t sample_i = _rng()%tox_peers.size();
|
||||
const auto [group_number, peer_number] = tox_peers.at(sample_i);
|
||||
|
||||
return std::make_pair(group_number, peer_number);
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
if (tox_peers.empty()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
const size_t sample_i = _rng()%tox_peers.size();
|
||||
const auto [group_number, peer_number] = tox_peers.at(sample_i);
|
||||
|
||||
return std::make_pair(group_number, peer_number);
|
||||
}
|
||||
|
||||
void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) {
|
||||
@ -247,7 +253,6 @@ SHA1_NGCFT1::SHA1_NGCFT1(
|
||||
}
|
||||
|
||||
float SHA1_NGCFT1::iterate(float delta) {
|
||||
//std::cerr << "---------- new tick ----------\n";
|
||||
_mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
|
||||
|
||||
_peer_open_requests.clear();
|
||||
@ -293,7 +298,6 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
assert(!o.any_of<ObjComp::F::TagLocalHaveAll>());
|
||||
|
||||
_queue_content_want_info.push_back(o);
|
||||
//_os.registry().remove<Components::ReRequestInfoTimer>(e);
|
||||
o.remove<Components::ReRequestInfoTimer>();
|
||||
// TODO: throw update?
|
||||
}
|
||||
@ -420,6 +424,8 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
|
||||
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
|
||||
|
||||
//std::cout << "SHA1_NGCFT1: trying to request info\n";
|
||||
|
||||
auto selected_peer_opt = selectPeerForRequest(ce);
|
||||
if (selected_peer_opt.has_value()) {
|
||||
const auto [group_number, peer_number] = selected_peer_opt.value();
|
||||
@ -434,10 +440,12 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
);
|
||||
ce.emplace<Components::ReRequestInfoTimer>(0.f);
|
||||
|
||||
_queue_content_want_info.pop_front();
|
||||
|
||||
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
|
||||
} else {
|
||||
_queue_content_want_info.push_back(ce);
|
||||
}
|
||||
|
||||
_queue_content_want_info.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
@ -573,27 +581,24 @@ void SHA1_NGCFT1::constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_ki
|
||||
o.get_or_emplace<Components::Messages>().messages.push_back(msg);
|
||||
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
|
||||
|
||||
// TODO: remove this assumption, this gets very unrelieable with hs
|
||||
// TODO: use to_c instead?
|
||||
if (const auto* from_c_comp = msg.try_get<Message::Components::ContactFrom>(); from_c_comp != nullptr && _cr.valid(from_c_comp->c)) {
|
||||
Contact3Handle c{_cr, from_c_comp->c};
|
||||
// HACK: assume the message sender is participating. usually a safe bet.
|
||||
if (addParticipation(c, o)) {
|
||||
// something happend, update chunk picker
|
||||
assert(static_cast<bool>(c));
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
// HACK: assume the message sender has all
|
||||
o.get_or_emplace<Components::RemoteHaveBitset>().others[c] = {true, {}};
|
||||
|
||||
// TODO: check if public
|
||||
// since public
|
||||
if (c.all_of<Contact::Components::Parent>()) {
|
||||
// TODO: if this is a dummy contact, should it have parent?
|
||||
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
|
||||
} else {
|
||||
std::cerr << "SHA1_NGCFT1 warning: from contact has no parent, cant add to AnnounceTargets\n";
|
||||
}
|
||||
// TODO: if private, add c directly
|
||||
}
|
||||
|
||||
// queue announce that we are participating
|
||||
o.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
|
||||
|
||||
// TODO: queue info dl
|
||||
// TODO: queue info/check if we already have info
|
||||
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
|
||||
@ -1301,63 +1306,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
|
||||
constructFileMessageInPlace({reg, new_msg_e}, e.file_kind, {e.file_id, e.file_id_size});
|
||||
|
||||
#if 0
|
||||
// check if content exists
|
||||
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
|
||||
ObjectHandle o;
|
||||
if (_info_to_content.count(sha1_info_hash)) {
|
||||
o = _info_to_content.at(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has existing content\n";
|
||||
} else {
|
||||
// TODO: backend
|
||||
o = _mfb.newObject(ByteSpan{sha1_info_hash});
|
||||
_info_to_content[sha1_info_hash] = o;
|
||||
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has new content\n";
|
||||
}
|
||||
o.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
|
||||
reg_ptr->emplace<Message::Components::MessageFileObject>(new_msg_e, o);
|
||||
|
||||
// HACK: assume the message sender is participating. usually a safe bet.
|
||||
if (addParticipation(c, o)) {
|
||||
// something happend, update chunk picker
|
||||
assert(static_cast<bool>(c));
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
// HACK: assume the message sender has all
|
||||
o.get_or_emplace<Components::RemoteHaveBitset>().others[c] = {true, {}};
|
||||
|
||||
// TODO: queue info dl
|
||||
// TODO: queue info/check if we already have info
|
||||
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
|
||||
bool in_info_want {false};
|
||||
for (const auto it : _queue_content_want_info) {
|
||||
if (it == o) {
|
||||
in_info_want = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!in_info_want) {
|
||||
// TODO: check if already receiving
|
||||
_queue_content_want_info.push_back(o);
|
||||
}
|
||||
} else if (o.all_of<Components::FT1InfoSHA1>()){
|
||||
// remove from info want
|
||||
o.remove<Components::ReRequestInfoTimer>();
|
||||
|
||||
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
|
||||
if (it != _queue_content_want_info.cend()) {
|
||||
_queue_content_want_info.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
// since public
|
||||
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
|
||||
|
||||
_os.throwEventUpdate(o);
|
||||
#endif
|
||||
|
||||
_rmm.throwEventConstruct(reg, new_msg_e);
|
||||
|
||||
return true; // false?
|
||||
@ -1696,15 +1644,21 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) {
|
||||
// something happend, update chunk picker
|
||||
// !!! this is probably too much
|
||||
assert(static_cast<bool>(c));
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
|
||||
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
std::cout << "SHA1_NGCFT1: and we where interested!\n";
|
||||
// we should probably send the bitset back here / add to queue (can be multiple packets)
|
||||
if (o.all_of<Components::FT1ChunkSHA1Cache>() && o.get<Components::FT1ChunkSHA1Cache>().have_count > 0) {
|
||||
queueBitsetSendFull(c, o);
|
||||
} else if (o.all_of<Components::ReAnnounceTimer>()) {
|
||||
o.get<Components::ReAnnounceTimer>().lower();
|
||||
}
|
||||
}
|
||||
|
||||
// return true instead?
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
std::deque<QBitsetEntry> _queue_send_bitset;
|
||||
|
||||
// FIXME: workaround missing contact events
|
||||
// only used to remove participation on peer exit
|
||||
// only used on peer exit (no, also used to quicken lookups)
|
||||
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
||||
|
||||
// reset every iterate; kept here as an allocation optimization
|
||||
|
@ -9,6 +9,9 @@
|
||||
#include <solanaceae/tox_messages/msg_components.hpp>
|
||||
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
|
||||
|
||||
// TODO: move somewhere else?
|
||||
#include <solanaceae/ngc_ft1_sha1/util.hpp>
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <entt/entity/entity.hpp>
|
||||
@ -233,10 +236,14 @@ void NGCHS2Rizzler::handleMsgPack(Contact3Handle sync_by_c, const std::vector<ui
|
||||
}
|
||||
|
||||
from_c = findContactByID(_cr, id);
|
||||
|
||||
if (!_cr.valid(from_c)) {
|
||||
// create sparse contact with id only
|
||||
from_c = _cr.create();
|
||||
_cr.emplace_or_replace<Contact::Components::ID>(from_c, id);
|
||||
|
||||
// TODO: only if public message
|
||||
_cr.emplace_or_replace<Contact::Components::Parent>(from_c, sync_by_c.get<Contact::Components::Parent>().parent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -333,6 +340,7 @@ void NGCHS2Rizzler::handleMsgPack(Contact3Handle sync_by_c, const std::vector<ui
|
||||
Message3Handle new_msg = new_real_msg;
|
||||
|
||||
if (dup_msg) {
|
||||
// we leak objects here (if file)
|
||||
reg.destroy(new_msg);
|
||||
new_msg = dup_msg;
|
||||
}
|
||||
@ -424,6 +432,7 @@ bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
}
|
||||
|
||||
auto& rnncl = c.get_or_emplace<Components::RunningChatLogs>();
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
|
||||
auto& transfer = rnncl.list[e.transfer_id];
|
||||
transfer.data.reserve(e.file_size); // danger?
|
||||
@ -463,9 +472,17 @@ bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
// TODO: fix disconnect
|
||||
if (!c) {
|
||||
// FIXME: this does not work, tcm just delteded the relation ship
|
||||
//auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
//if (!c) {
|
||||
// return false;
|
||||
//}
|
||||
const auto c_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
|
||||
if (c_it == _tox_peer_to_contact.end()) {
|
||||
return false;
|
||||
}
|
||||
auto c = c_it->second;
|
||||
if (!static_cast<bool>(c)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -481,6 +498,7 @@ bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
std::cout << "NGCHS2Rizzler: recv_done " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
|
||||
{
|
||||
auto& transfer = rnncl.list.at(e.transfer_id);
|
||||
// TODO: done might mean failed, so we might be parsing bs here
|
||||
|
||||
// use data
|
||||
// TODO: move out of packet handler
|
||||
|
@ -36,6 +36,10 @@ class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
|
||||
// c -> delay, timer
|
||||
std::map<Contact3, RequestQueueInfo> _request_queue;
|
||||
|
||||
// FIXME: workaround missing contact events
|
||||
// only used on peer exit (no, also used to quicken lookups)
|
||||
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
||||
|
||||
public:
|
||||
NGCHS2Rizzler(
|
||||
Contact3Registry& cr,
|
||||
|
@ -15,6 +15,9 @@
|
||||
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
|
||||
#include <solanaceae/ngc_ft1_sha1/components.hpp>
|
||||
|
||||
// TODO: move somewhere else?
|
||||
#include <solanaceae/ngc_ft1_sha1/util.hpp>
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#include "./serl.hpp"
|
||||
@ -132,6 +135,7 @@ float NGCHS2Sigma::iterate(float delta) {
|
||||
continue;
|
||||
}
|
||||
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
|
||||
_tox_peer_to_contact[combine_ids(group_number, peer_number)] = c; // cache
|
||||
|
||||
// TODO: check allowed range here
|
||||
//_max_time_into_past_default
|
||||
@ -423,8 +427,17 @@ bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
|
||||
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_done& e) {
|
||||
// TODO: this will return null if the peer just disconnected
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (!c) {
|
||||
// FIXME: this does not work, tcm just delteded the relation ship
|
||||
//auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
//if (!c) {
|
||||
// return false;
|
||||
//}
|
||||
const auto c_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
|
||||
if (c_it == _tox_peer_to_contact.end()) {
|
||||
return false;
|
||||
}
|
||||
auto c = c_it->second;
|
||||
if (!static_cast<bool>(c)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -47,6 +47,10 @@ class NGCHS2Sigma : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
constexpr static bool _only_send_self_observed {true};
|
||||
constexpr static int64_t _max_time_into_past_default {60*15}; // s
|
||||
|
||||
// FIXME: workaround missing contact events
|
||||
// only used on peer exit (no, also used to quicken lookups)
|
||||
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
||||
|
||||
public:
|
||||
NGCHS2Sigma(
|
||||
Contact3Registry& cr,
|
||||
|
Loading…
x
Reference in New Issue
Block a user