receiving works!! but the very naive pice picker over requests, making it super slow

This commit is contained in:
Green Sky 2023-08-18 18:12:41 +02:00
parent 100f4f68c0
commit 8c221a3e17
No known key found for this signature in database

View File

@ -391,10 +391,33 @@ void SHA1_NGCFT1::iterate(float delta) {
auto selected_peer_opt = selectPeerForRequest(ce); auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) { if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value(); const auto [group_number, peer_number] = selected_peer_opt.value();
std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n"; //std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n";
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
const auto& info = ce.get<Components::FT1InfoSHA1>();
// naive, choose first chunk we dont have (double requests!!)
for (size_t chunk_idx = 0; chunk_idx < cc.have_chunk.size(); chunk_idx++) {
if (cc.have_chunk[chunk_idx]) {
continue;
}
// request chunk_idx
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size()
);
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n";
break;
}
// ... // ...
// TODO: properly determine
if (!cc.have_all) {
_queue_content_want_chunk.push_back(ce);
}
_queue_content_want_chunk.pop_front(); _queue_content_want_chunk.pop_front();
} }
} }
@ -443,6 +466,18 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl)); ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
// next, create chuck cache and check for existing data // next, create chuck cache and check for existing data
// TODO: check existing and dont truncate
{
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
cc.have_all = false;
cc.have_count = 0;
for (size_t i = 0; i < info.chunks.size(); i++) {
cc.have_chunk.push_back(false);
_chunks[info.chunks[i]] = ce;
cc.chunk_hash_to_index[info.chunks[i]] = i;
}
}
// now, enque // now, enque
_queue_content_want_chunk.push_back(ce); _queue_content_want_chunk.push_back(ce);
@ -568,6 +603,50 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
e.accept = true; e.accept = true;
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size};
if (!_chunks.count(sha1_chunk_hash)) {
// no idea about this content
return false;
}
auto ce = _chunks.at(sha1_chunk_hash);
// CHECK IF TRANSFER IN PROGESS!!
{ // they have the content (probably, might be fake, should move this to done)
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
}
assert(ce.all_of<Components::FT1InfoSHA1>());
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
const auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
if (cc.haveChunk(sha1_chunk_hash)) {
std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n";
// we have the chunk
return false;
}
// TODO: cache position
// calc offset_into_file
auto idx_opt = cc.chunkIndex(sha1_chunk_hash);
assert(idx_opt.has_value());
const auto& info = ce.get<Components::FT1InfoSHA1>();
uint64_t offset_into_file = uint64_t(info.chunk_size) * idx_opt.value();
// TODO: check e.file_size
_receiving_transfers
[combineIds(e.group_number, e.peer_number)]
[e.transfer_id]
.v = ReceivingTransfer::Chunk{ce, offset_into_file};
e.accept = true;
std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n";
} else { } else {
assert(false && "unhandled case"); assert(false && "unhandled case");
} }
@ -727,8 +806,25 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
// check hash of chunk // check hash of chunk
auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size()); auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size());
if (info.chunks.at(chunk_index) == got_hash) { if (info.chunks.at(chunk_index) == got_hash) {
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
// TODO: check for have all // TODO: check for have all
if (!cc.have_all && !cc.have_chunk.at(chunk_index)) {
cc.have_chunk.at(chunk_index) = true; cc.have_chunk.at(chunk_index) = true;
cc.have_count += 1;
if (cc.have_count == info.chunks.size()) {
// debug check
for (const bool it : cc.have_chunk) {
assert(it);
}
cc.have_all = true;
cc.have_chunk.clear(); // not wasting memory
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
}
} else {
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
}
// good chunk // good chunk
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size(); ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size();
@ -992,6 +1088,13 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// TODO: we dont want chunks anymore // TODO: we dont want chunks anymore
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
auto it = _queue_content_want_chunk.begin();
while (
it != _queue_content_want_chunk.end() &&
(it = std::find(it, _queue_content_want_chunk.end(), ce)) != _queue_content_want_chunk.end()
) {
it = _queue_content_want_chunk.erase(it);
}
} else { } else {
ce = {_contentr, _contentr.create()}; ce = {_contentr, _contentr.create()};
_info_to_content[sha1_info_hash] = ce; _info_to_content[sha1_info_hash] = ce;