Compare commits

..

2 Commits

4 changed files with 61 additions and 97 deletions

View File

@ -35,7 +35,8 @@ void ChunkPicker::updateParticipation(
std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests( std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
Contact3Handle c, Contact3Handle c,
ObjectRegistry& objreg, ObjectRegistry& objreg,
ReceivingTransfers& rt ReceivingTransfers& rt,
const size_t open_requests
//NGCFT1& nft //NGCFT1& nft
) { ) {
if (!static_cast<bool>(c)) { if (!static_cast<bool>(c)) {
@ -47,21 +48,23 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
} }
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>(); const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
if (participating_unfinished.empty()) {
participating_in_last = entt::null;
return {};
}
std::vector<ContentChunkR> req_ret; std::vector<ContentChunkR> req_ret;
// count running tf and open requests // count running tf and open requests
const size_t num_ongoing_transfers = rt.sizePeer(group_number, peer_number); const size_t num_ongoing_transfers = rt.sizePeer(group_number, peer_number);
// TODO: account for open requests // TODO: account for open requests
const int64_t num_total = num_ongoing_transfers + open_requests;
// TODO: base max on rate(chunks per sec), gonna be ass with variable chunk size // TODO: base max on rate(chunks per sec), gonna be ass with variable chunk size
const size_t num_requests = std::max<int64_t>(0, max_tf_chunk_requests-num_ongoing_transfers); const size_t num_requests = std::max<int64_t>(0, int64_t(max_tf_chunk_requests)-num_total);
std::cerr << "CP: want " << num_requests << "(rt:" << num_ongoing_transfers << " or:" << open_requests << ") from " << group_number << ":" << peer_number << "\n";
// while n < X // while n < X
if (participating_unfinished.empty()) {
participating_in_last = entt::null;
return {};
}
// round robin content (remember last obj) // round robin content (remember last obj)
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) { if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
participating_in_last = participating_unfinished.begin()->first; participating_in_last = participating_unfinished.begin()->first;
@ -133,28 +136,29 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// - arbitrary priority maps/functions (and combine with above in rations) // - arbitrary priority maps/functions (and combine with above in rations)
// simple, we use first // simple, we use first
// TODO: optimize simple and start at first chunk we dont have
for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) { for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) {
if (!chunk_candidates[i]) { if (!chunk_candidates[i]) {
continue; continue;
} }
// i is a candidate we can request form peer // i is a potential candidate we can request form peer
// first check against double requests // - check against double requests
if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool { if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const ContentChunkR& x) -> bool {
return false; return x.object == o && x.chunk_index == i;
}) != req_ret.cend()) { }) != req_ret.cend()) {
// already in return array // already in return array
// how did we get here? should we fast exit? if simple-first strat, we would want to // how did we get here? should we fast exit? if simple-first strat, we would want to
continue; // skip continue; // skip
} }
// second check against global requests (this might differ based on strat) // - check against global requests (this might differ based on strat)
if (requested_chunks.count(i) != 0) { if (requested_chunks.count(i) != 0) {
continue; continue;
} }
// third we check against globally running transfers (this might differ based on strat) // - we check against globally running transfers (this might differ based on strat)
if (rt.containsChunk(o, i)) { if (rt.containsChunk(o, i)) {
continue; continue;
} }
@ -162,11 +166,16 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// if nothing else blocks this, add to ret // if nothing else blocks this, add to ret
req_ret.push_back(ContentChunkR{o, i}); req_ret.push_back(ContentChunkR{o, i});
assert(requested_chunks.count(i) == 0); // TODO: move this after packet was sent successfully
requested_chunks[i] = 0.f; // (move net in? hmm)
requested_chunks[i] = Components::FT1ChunkSHA1Requested::Entry{0.f, c};
} }
} }
if (req_ret.size() < num_requests) {
std::cerr << "CP: could not fulfil, " << group_number << ":" << peer_number << " only has " << req_ret.size() << " candidates\n";
}
// -- no -- (just compat with old code, ignore) // -- no -- (just compat with old code, ignore)
// if n < X // if n < X
// optimistically request 1 chunk other does not have // optimistically request 1 chunk other does not have

View File

@ -22,12 +22,12 @@
struct ChunkPicker { struct ChunkPicker {
// max transfers // max transfers
static constexpr size_t max_tf_info_requests {1}; static constexpr size_t max_tf_info_requests {1};
static constexpr size_t max_tf_chunk_requests {2}; static constexpr size_t max_tf_chunk_requests {3};
// max outstanding requests //// max outstanding requests
// TODO: should this include transfers? //// TODO: should this include transfers?
static constexpr size_t max_open_info_requests {1}; //static constexpr size_t max_open_info_requests {1};
const size_t max_open_chunk_requests {6}; //const size_t max_open_chunk_requests {6};
// TODO: handle with hash utils? // TODO: handle with hash utils?
struct ParticipationEntry { struct ParticipationEntry {
@ -62,7 +62,8 @@ struct ChunkPicker {
[[nodiscard]] std::vector<ContentChunkR> updateChunkRequests( [[nodiscard]] std::vector<ContentChunkR> updateChunkRequests(
Contact3Handle c, Contact3Handle c,
ObjectRegistry& objreg, ObjectRegistry& objreg,
ReceivingTransfers& rt ReceivingTransfers& rt,
const size_t open_requests
//NGCFT1& nft //NGCFT1& nft
); );

View File

@ -49,7 +49,11 @@ namespace Components {
struct FT1ChunkSHA1Requested { struct FT1ChunkSHA1Requested {
// requested chunks with a timer since last request // requested chunks with a timer since last request
entt::dense_map<size_t, float> chunks; struct Entry {
float timer {0.f};
Contact3 c {entt::null};
};
entt::dense_map<size_t, Entry> chunks;
}; };
// TODO: once announce is shipped, remove the "Suspected" // TODO: once announce is shipped, remove the "Suspected"

View File

@ -217,6 +217,7 @@ SHA1_NGCFT1::SHA1_NGCFT1(
} }
void SHA1_NGCFT1::iterate(float delta) { void SHA1_NGCFT1::iterate(float delta) {
std::cerr << "---------- new tick ----------\n";
// info builder queue // info builder queue
if (_info_builder_dirty) { if (_info_builder_dirty) {
std::lock_guard l{_info_builder_queue_mutex}; std::lock_guard l{_info_builder_queue_mutex};
@ -229,6 +230,8 @@ void SHA1_NGCFT1::iterate(float delta) {
_info_builder_queue.clear(); _info_builder_queue.clear();
} }
entt::dense_map<Contact3, size_t> _peer_open_requests;
{ // timers { // timers
// sending transfers // sending transfers
for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) { for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) {
@ -287,14 +290,15 @@ void SHA1_NGCFT1::iterate(float delta) {
} }
} }
{ // requested chunk timers { // requested chunk timers
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) { _os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta, &_peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
it->second += delta; it->second.timer += delta;
// 15sec, TODO: config // 15sec, TODO: config
if (it->second >= 15.f) { if (it->second.timer >= 15.f) {
it = ftchunk_requested.chunks.erase(it); it = ftchunk_requested.chunks.erase(it);
} else { } else {
_peer_open_requests[it->second.c] += 1;
it++; it++;
} }
} }
@ -387,76 +391,10 @@ void SHA1_NGCFT1::iterate(float delta) {
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
} }
#if 0
} else if (!_queue_content_want_chunk.empty()) {
const auto ce = _queue_content_want_chunk.front();
auto& requested_chunks = ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
if (requested_chunks.size() < _max_pending_requests) {
// select chunk/make sure we still need one
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
//std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n";
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
const auto& info = ce.get<Components::FT1InfoSHA1>();
if (cc.have_all) {
_queue_content_want_chunk.pop_front();
} else {
// naive, choose first chunk we dont have (double requests!!)
// TODO: piece picker, choose what other have (invert selectPeerForRequest)
for (size_t chunk_idx = 0; chunk_idx < info.chunks.size() /* cc.total_ */; chunk_idx++) {
if (cc.have_chunk[chunk_idx]) {
continue;
}
// check by hash
if (cc.haveChunk(info.chunks.at(chunk_idx))) {
// TODO: fix this, a completed chunk should fill all the indecies it occupies
cc.have_chunk.set(chunk_idx);
cc.have_count += 1;
if (cc.have_count == info.chunks.size()) {
cc.have_all = true;
cc.have_chunk = BitSet(0); // conserve space
break;
}
continue;
}
if (requested_chunks.count(chunk_idx)) {
// already requested
continue;
}
// request chunk_idx
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size()
);
requested_chunks[chunk_idx] = 0.f;
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n";
break;
}
// ...
// TODO: properly determine
if (!cc.have_all) {
_queue_content_want_chunk.push_back(ce);
}
_queue_content_want_chunk.pop_front();
}
}
}
#endif
} }
// new chunk picker code // new chunk picker code
_cr.view<ChunkPicker>().each([this](const Contact3 cv, ChunkPicker& cp) { _cr.view<ChunkPicker>().each([this, &_peer_open_requests](const Contact3 cv, ChunkPicker& cp) {
Contact3Handle c{_cr, cv}; Contact3Handle c{_cr, cv};
// HACK: expensive, dont do every tick, only on events // HACK: expensive, dont do every tick, only on events
// do verification in debug instead? // do verification in debug instead?
@ -467,10 +405,16 @@ void SHA1_NGCFT1::iterate(float delta) {
assert(!cp.participating.empty()); assert(!cp.participating.empty());
size_t peer_open_request = 0;
if (_peer_open_requests.contains(c)) {
peer_open_request += _peer_open_requests.at(c);
}
auto new_requests = cp.updateChunkRequests( auto new_requests = cp.updateChunkRequests(
c, c,
_os.registry(), _os.registry(),
_receiving_transfers _receiving_transfers,
peer_open_request
); );
if (new_requests.empty()) { if (new_requests.empty()) {
@ -796,6 +740,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
e.accept = true; e.accept = true;
// now running, remove from requested
for (const auto it : _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id).getChunk().chunk_indices) {
o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
}
std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n"; std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n";
} else { } else {
assert(false && "unhandled case"); assert(false && "unhandled case");
@ -1019,12 +968,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
if (!cc.have_all) { // debug print self have set if (!cc.have_all) { // debug print self have set
std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits(); std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits();
for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) { for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) {
if (i % 16 == 0) { if (i % 32 == 0) {
std::cout << "\n"; printf("\n");
} }
std::cout << std::hex << (uint16_t)cc.have_chunk.data()[i] << " "; // f cout
printf("%.2x", (uint16_t)cc.have_chunk.data()[i]);
} }
std::cout << std::dec << "\n"; printf("\n");
} }
} else { } else {
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";