limit pending requests
This commit is contained in:
parent
099abc3a09
commit
845586c2db
@ -427,59 +427,61 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
} else if (!_queue_content_want_chunk.empty()) {
|
} else if (!_queue_content_want_chunk.empty()) {
|
||||||
const auto ce = _queue_content_want_chunk.front();
|
const auto ce = _queue_content_want_chunk.front();
|
||||||
|
|
||||||
// select chunk/make sure we still need one
|
auto& requested_chunks = ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
|
||||||
|
if (requested_chunks.size() < _max_pending_requests) {
|
||||||
|
|
||||||
auto selected_peer_opt = selectPeerForRequest(ce);
|
// select chunk/make sure we still need one
|
||||||
if (selected_peer_opt.has_value()) {
|
auto selected_peer_opt = selectPeerForRequest(ce);
|
||||||
const auto [group_number, peer_number] = selected_peer_opt.value();
|
if (selected_peer_opt.has_value()) {
|
||||||
//std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n";
|
const auto [group_number, peer_number] = selected_peer_opt.value();
|
||||||
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
//std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n";
|
||||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
||||||
|
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||||
|
|
||||||
// naive, choose first chunk we dont have (double requests!!)
|
// naive, choose first chunk we dont have (double requests!!)
|
||||||
for (size_t chunk_idx = 0; chunk_idx < cc.have_chunk.size(); chunk_idx++) {
|
for (size_t chunk_idx = 0; chunk_idx < cc.have_chunk.size(); chunk_idx++) {
|
||||||
if (cc.have_chunk[chunk_idx]) {
|
if (cc.have_chunk[chunk_idx]) {
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
|
|
||||||
// check by hash
|
|
||||||
if (cc.haveChunk(info.chunks.at(chunk_idx))) {
|
|
||||||
// TODO: fix this, a completed chunk should fill all the indecies it occupies
|
|
||||||
cc.have_chunk[chunk_idx] = true;
|
|
||||||
cc.have_count += 1;
|
|
||||||
if (cc.have_count == info.chunks.size()) {
|
|
||||||
cc.have_all = true;
|
|
||||||
cc.have_chunk.clear();
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
|
// check by hash
|
||||||
|
if (cc.haveChunk(info.chunks.at(chunk_idx))) {
|
||||||
|
// TODO: fix this, a completed chunk should fill all the indecies it occupies
|
||||||
|
cc.have_chunk[chunk_idx] = true;
|
||||||
|
cc.have_count += 1;
|
||||||
|
if (cc.have_count == info.chunks.size()) {
|
||||||
|
cc.have_all = true;
|
||||||
|
cc.have_chunk.clear();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (requested_chunks.count(chunk_idx)) {
|
||||||
|
// already requested
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// request chunk_idx
|
||||||
|
_nft.NGC_FT1_send_request_private(
|
||||||
|
group_number, peer_number,
|
||||||
|
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
||||||
|
info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size()
|
||||||
|
);
|
||||||
|
requested_chunks[chunk_idx] = 0.f;
|
||||||
|
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n";
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& requested_chunks = ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
|
// ...
|
||||||
if (requested_chunks.count(chunk_idx)) {
|
|
||||||
// already requested
|
// TODO: properly determine
|
||||||
continue;
|
if (!cc.have_all) {
|
||||||
|
_queue_content_want_chunk.push_back(ce);
|
||||||
}
|
}
|
||||||
|
_queue_content_want_chunk.pop_front();
|
||||||
// request chunk_idx
|
|
||||||
_nft.NGC_FT1_send_request_private(
|
|
||||||
group_number, peer_number,
|
|
||||||
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
|
||||||
info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size()
|
|
||||||
);
|
|
||||||
requested_chunks[chunk_idx] = 0.f;
|
|
||||||
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n";
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ...
|
|
||||||
|
|
||||||
// TODO: properly determine
|
|
||||||
if (!cc.have_all) {
|
|
||||||
_queue_content_want_chunk.push_back(ce);
|
|
||||||
}
|
|
||||||
_queue_content_want_chunk.pop_front();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -878,6 +880,12 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
if (info.chunks.at(chunk_index) == got_hash) {
|
if (info.chunks.at(chunk_index) == got_hash) {
|
||||||
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
||||||
|
|
||||||
|
// remove from requested
|
||||||
|
// TODO: remove at init and track running transfers differently
|
||||||
|
for (const auto it : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||||
|
ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
|
||||||
|
}
|
||||||
|
|
||||||
if (!cc.have_all) {
|
if (!cc.have_all) {
|
||||||
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||||
if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) {
|
if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) {
|
||||||
|
@ -104,6 +104,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
|
|
||||||
size_t _max_concurrent_in {4};
|
size_t _max_concurrent_in {4};
|
||||||
size_t _max_concurrent_out {6};
|
size_t _max_concurrent_out {6};
|
||||||
|
// TODO: probably also includes running transfers rn (meh)
|
||||||
|
size_t _max_pending_requests {16}; // per content
|
||||||
|
|
||||||
public:
|
public:
|
||||||
SHA1_NGCFT1(
|
SHA1_NGCFT1(
|
||||||
|
Loading…
Reference in New Issue
Block a user