properly account for open requests when determining how much to request
This commit is contained in:
@ -217,6 +217,7 @@ SHA1_NGCFT1::SHA1_NGCFT1(
|
||||
}
|
||||
|
||||
void SHA1_NGCFT1::iterate(float delta) {
|
||||
std::cerr << "---------- new tick ----------\n";
|
||||
// info builder queue
|
||||
if (_info_builder_dirty) {
|
||||
std::lock_guard l{_info_builder_queue_mutex};
|
||||
@ -229,6 +230,8 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
_info_builder_queue.clear();
|
||||
}
|
||||
|
||||
entt::dense_map<Contact3, size_t> _peer_open_requests;
|
||||
|
||||
{ // timers
|
||||
// sending transfers
|
||||
for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) {
|
||||
@ -287,14 +290,15 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
}
|
||||
}
|
||||
{ // requested chunk timers
|
||||
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta, &_peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
||||
it->second += delta;
|
||||
it->second.timer += delta;
|
||||
|
||||
// 15sec, TODO: config
|
||||
if (it->second >= 15.f) {
|
||||
if (it->second.timer >= 15.f) {
|
||||
it = ftchunk_requested.chunks.erase(it);
|
||||
} else {
|
||||
_peer_open_requests[it->second.c] += 1;
|
||||
it++;
|
||||
}
|
||||
}
|
||||
@ -387,76 +391,10 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
|
||||
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
|
||||
}
|
||||
#if 0
|
||||
} else if (!_queue_content_want_chunk.empty()) {
|
||||
const auto ce = _queue_content_want_chunk.front();
|
||||
|
||||
auto& requested_chunks = ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
|
||||
if (requested_chunks.size() < _max_pending_requests) {
|
||||
|
||||
// select chunk/make sure we still need one
|
||||
auto selected_peer_opt = selectPeerForRequest(ce);
|
||||
if (selected_peer_opt.has_value()) {
|
||||
const auto [group_number, peer_number] = selected_peer_opt.value();
|
||||
//std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n";
|
||||
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||
|
||||
if (cc.have_all) {
|
||||
_queue_content_want_chunk.pop_front();
|
||||
} else {
|
||||
// naive, choose first chunk we dont have (double requests!!)
|
||||
// TODO: piece picker, choose what other have (invert selectPeerForRequest)
|
||||
for (size_t chunk_idx = 0; chunk_idx < info.chunks.size() /* cc.total_ */; chunk_idx++) {
|
||||
if (cc.have_chunk[chunk_idx]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// check by hash
|
||||
if (cc.haveChunk(info.chunks.at(chunk_idx))) {
|
||||
// TODO: fix this, a completed chunk should fill all the indecies it occupies
|
||||
cc.have_chunk.set(chunk_idx);
|
||||
cc.have_count += 1;
|
||||
if (cc.have_count == info.chunks.size()) {
|
||||
cc.have_all = true;
|
||||
cc.have_chunk = BitSet(0); // conserve space
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (requested_chunks.count(chunk_idx)) {
|
||||
// already requested
|
||||
continue;
|
||||
}
|
||||
|
||||
// request chunk_idx
|
||||
_nft.NGC_FT1_send_request_private(
|
||||
group_number, peer_number,
|
||||
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
||||
info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size()
|
||||
);
|
||||
requested_chunks[chunk_idx] = 0.f;
|
||||
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n";
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// ...
|
||||
|
||||
// TODO: properly determine
|
||||
if (!cc.have_all) {
|
||||
_queue_content_want_chunk.push_back(ce);
|
||||
}
|
||||
_queue_content_want_chunk.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// new chunk picker code
|
||||
_cr.view<ChunkPicker>().each([this](const Contact3 cv, ChunkPicker& cp) {
|
||||
_cr.view<ChunkPicker>().each([this, &_peer_open_requests](const Contact3 cv, ChunkPicker& cp) {
|
||||
Contact3Handle c{_cr, cv};
|
||||
// HACK: expensive, dont do every tick, only on events
|
||||
// do verification in debug instead?
|
||||
@ -467,10 +405,16 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
|
||||
assert(!cp.participating.empty());
|
||||
|
||||
size_t peer_open_request = 0;
|
||||
if (_peer_open_requests.contains(c)) {
|
||||
peer_open_request += _peer_open_requests.at(c);
|
||||
}
|
||||
|
||||
auto new_requests = cp.updateChunkRequests(
|
||||
c,
|
||||
_os.registry(),
|
||||
_receiving_transfers
|
||||
_receiving_transfers,
|
||||
peer_open_request
|
||||
);
|
||||
|
||||
if (new_requests.empty()) {
|
||||
@ -796,6 +740,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
|
||||
e.accept = true;
|
||||
|
||||
// now running, remove from requested
|
||||
for (const auto it : _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id).getChunk().chunk_indices) {
|
||||
o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
|
||||
}
|
||||
|
||||
std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n";
|
||||
} else {
|
||||
assert(false && "unhandled case");
|
||||
|
Reference in New Issue
Block a user