request info for messages with out info (only hash)

This commit is contained in:
Green Sky 2023-08-14 17:08:27 +02:00
parent 1f53bc9e54
commit 9b4a258e48
No known key found for this signature in database
3 changed files with 165 additions and 42 deletions

View File

@ -438,7 +438,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
} }
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) { bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << e.transfer_id << " [" << bin2hex(e.file_id) << "]\n"; std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n";
bool accept = false; bool accept = false;
dispatch( dispatch(

View File

@ -14,6 +14,8 @@
#include <sodium.h> #include <sodium.h>
#include <entt/container/dense_set.hpp>
#include <iostream> #include <iostream>
#include <variant> #include <variant>
@ -51,6 +53,14 @@ namespace Components {
bool haveChunk(const SHA1Digest& hash) const; bool haveChunk(const SHA1Digest& hash) const;
}; };
struct SuspectedParticipants {
entt::dense_set<Contact3> participants;
};
struct ReRequestInfoTimer {
float timer {0.f};
};
} // Components } // Components
std::optional<size_t> Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const { std::optional<size_t> Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const {
@ -198,49 +208,45 @@ void SHA1_NGCFT1::iterate(float delta) {
it++; it++;
} }
} }
{ // requested info timers
std::vector<Content> timed_out;
_contentr.view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Content e, Components::ReRequestInfoTimer& rrit) {
rrit.timer += delta;
// 15sec, TODO: config
if (rrit.timer >= 15.f) {
timed_out.push_back(e);
}
});
for (const auto e : timed_out) {
// TODO: avoid dups
_queue_content_want_info.push_back({_contentr, e});
_contentr.remove<Components::ReRequestInfoTimer>(e);
}
}
} }
// if we have not reached the total cap for transfers // if we have not reached the total cap for transfers
// count running transfers // count running transfers
size_t running_transfer_count {0}; size_t running_sending_transfer_count {0};
for (const auto& [_, transfers] : _sending_transfers) { for (const auto& [_, transfers] : _sending_transfers) {
running_transfer_count += transfers.size(); running_sending_transfer_count += transfers.size();
} }
if (running_transfer_count < _max_concurrent_out) { size_t running_receiving_transfer_count {0};
// for each peer? transfer cap per peer? for (const auto& [_, transfers] : _receiving_transfers) {
#if 0 running_receiving_transfer_count += transfers.size();
// first check requests for info
if (!_queue_requested_info.empty()) {
// send init to _queue_requested_info
const auto [group_number, peer_number] = _queue_requested_info.front();
if (_tcl.getGroupPeerConnectionStatus(group_number, peer_number) != TOX_CONNECTION_NONE) {
uint8_t transfer_id {0};
if (_tcl.sendFT1InitPrivate(
group_number, peer_number,
NGC_FT1_file_kind::HASH_SHA1_INFO,
_sha1_info_hash.data.data(), _sha1_info_hash.size(), // id (info hash)
_sha1_info_data.size(), // "file_size"
transfer_id
)) {
_transfers_requested_info.push_back({
group_number, peer_number,
transfer_id,
0.f
});
_queue_requested_info.pop_front();
} }
}
} else if (running_sending_transfer_count < _max_concurrent_out) {
#endif // TODO: for each peer? transfer cap per peer?
// TODO: info queue
if (!_queue_requested_chunk.empty()) { // then check for chunk requests if (!_queue_requested_chunk.empty()) { // then check for chunk requests
const auto [group_number, peer_number, msg, chunk_hash, _] = _queue_requested_chunk.front(); const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front();
auto chunk_idx_opt = msg.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash); auto chunk_idx_opt = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash);
if (chunk_idx_opt.has_value()) { if (chunk_idx_opt.has_value()) {
const auto& info = msg.get<Components::FT1InfoSHA1>(); const auto& info = ce.get<Components::FT1InfoSHA1>();
uint8_t transfer_id {0}; uint8_t transfer_id {0};
if (_nft.NGC_FT1_send_init_private( if (_nft.NGC_FT1_send_init_private(
@ -253,13 +259,106 @@ void SHA1_NGCFT1::iterate(float delta) {
_sending_transfers _sending_transfers
[combineIds(group_number, peer_number)] [combineIds(group_number, peer_number)]
[transfer_id] // TODO: also save index? [transfer_id] // TODO: also save index?
.v = SendingTransfer::Chunk{msg, chunk_idx_opt.value() * info.chunk_size}; .v = SendingTransfer::Chunk{ce, chunk_idx_opt.value() * info.chunk_size};
} }
} }
// remove from queue regardless // remove from queue regardless
_queue_requested_chunk.pop_front(); _queue_requested_chunk.pop_front();
} }
} }
if (running_receiving_transfer_count < _max_concurrent_in) {
// strictly priorize info
if (!_queue_content_want_info.empty()) {
const auto ce = _queue_content_want_info.front();
// make sure we are missing the info
assert(!ce.all_of<Components::ReRequestInfoTimer>());
assert(!ce.all_of<Components::FT1InfoSHA1>());
assert(!ce.all_of<Components::FT1InfoSHA1Data>());
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
// get a list of peers we can request this file from
// TODO: randomly request from non SuspectedParticipants
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
// 1 in 20 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
if (tox_peers.empty() || (_rng()%20) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
const uint32_t group_number = tox_peers.back().first;
auto gch = _tcm.getContactGroup(group_number);
assert(static_cast<bool>(gch));
std::vector<uint32_t> un_tox_peers;
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
un_tox_peers.push_back(tgpe.peer_number);
}
}
if (un_tox_peers.empty()) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng()%un_tox_peers.size();
const auto peer_number = un_tox_peers.at(sample_i);
//const auto& info = msg.get<Components::FT1InfoSHA1>();
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO),
info_hash.data(), info_hash.size()
);
ce.emplace<Components::ReRequestInfoTimer>(0.f);
_queue_content_want_info.pop_front();
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << " (rng)\n";
}
}
} else {
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
//const auto& info = msg.get<Components::FT1InfoSHA1>();
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO),
info_hash.data(), info_hash.size()
);
ce.emplace<Components::ReRequestInfoTimer>(0.f);
_queue_content_want_info.pop_front();
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
}
} else if (!_queue_content_want_chunk.empty()) {
}
}
} }
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
@ -317,17 +416,22 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
return false; return false;
} }
auto msg = _chunks.at(chunk_hash); auto ce = _chunks.at(chunk_hash);
{ // they advertise interest in the content
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
}
assert(msg.all_of<Components::FT1ChunkSHA1Cache>()); assert(msg.all_of<Components::FT1ChunkSHA1Cache>());
if (!msg.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) { if (!ce.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
// we dont have the chunk // we dont have the chunk
return false; return false;
} }
// queue good request // queue good request
queueUpRequestChunk(e.group_number, e.peer_number, msg, chunk_hash); queueUpRequestChunk(e.group_number, e.peer_number, ce, chunk_hash);
} else { } else {
assert(false && "unhandled case"); assert(false && "unhandled case");
} }
@ -411,6 +515,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
return false; return false;
} }
const auto& tv = peer_transfers[e.transfer_id].v;
if (std::holds_alternative<SendingTransfer::Chunk>(tv)) {
updateMessages(std::get<SendingTransfer::Chunk>(tv).content); // mostly for sent bytes
}
peer_transfers.erase(e.transfer_id); peer_transfers.erase(e.transfer_id);
return true; return true;
@ -490,6 +598,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
} }
ce.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e}); ce.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) {
// TODO: check if already receiving
_queue_content_want_info.push_back(ce);
}
// TODO: queue info dl // TODO: queue info dl
//reg_ptr->emplace<Components::FT1InfoSHA1>(e, sha1_info); //reg_ptr->emplace<Components::FT1InfoSHA1>(e, sha1_info);
@ -598,9 +713,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
cc.have_count = sha1_info.chunks.size(); // need? cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce; _info_to_content[sha1_info_hash] = ce;
for (size_t i = 0; i < sha1_info.chunks.size(); i++) { for (size_t i = sha1_info.chunks.size(); i > 0; i--) {
_chunks[sha1_info.chunks[i]] = ce; _chunks[sha1_info.chunks[i-1]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; // chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index
cc.chunk_hash_to_index[sha1_info.chunks[i-1]] = i-1;
} }
} }

View File

@ -15,6 +15,7 @@
#include <entt/container/dense_map.hpp> #include <entt/container/dense_map.hpp>
#include <variant> #include <variant>
#include <random>
enum class Content : uint32_t {}; enum class Content : uint32_t {};
using ContentRegistry = entt::basic_registry<Content>; using ContentRegistry = entt::basic_registry<Content>;
@ -26,6 +27,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
NGCFT1& _nft; NGCFT1& _nft;
ToxContactModel2& _tcm; ToxContactModel2& _tcm;
std::minstd_rand _rng {1337*11};
// registry per group? // registry per group?
ContentRegistry _contentr; ContentRegistry _contentr;
@ -64,10 +67,10 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
struct ReceivingTransfer { struct ReceivingTransfer {
struct Info { struct Info {
ContentHandle content;
// copy of info data // copy of info data
// too large? // too large?
std::vector<uint8_t> info_data; std::vector<uint8_t> info_data;
// content?
}; };
struct Chunk { struct Chunk {
@ -84,6 +87,10 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
// key is groupid + peerid // key is groupid + peerid
entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers; entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers;
// makes request rotate around open content
std::deque<ContentHandle> _queue_content_want_info;
std::deque<ContentHandle> _queue_content_want_chunk;
static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number); static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number);
void updateMessages(ContentHandle ce); void updateMessages(ContentHandle ce);