From 7812e3a413ca056a3c59a5f1445740ff6aef78f4 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 15 Aug 2023 16:06:54 +0200 Subject: [PATCH] receive info --- src/ngcft1.cpp | 13 ++++ src/sha1_ngcft1.cpp | 157 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 155 insertions(+), 15 deletions(-) diff --git a/src/ngcft1.cpp b/src/ngcft1.cpp index 8e36454..32aff77 100644 --- a/src/ngcft1.cpp +++ b/src/ngcft1.cpp @@ -560,6 +560,19 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) { sendPKG_FT1_DATA_ACK(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size()); } + + if (transfer.file_size_current == transfer.file_size) { + // TODO: set all data received, and clean up + //transfer.state = Group::Peer::RecvTransfer::State::RECV; + dispatch( + NGCFT1_Event::recv_done, + Events::NGCFT1_recv_done{ + e.group_number, e.peer_number, + e.transfer_id + } + ); + } + return true; } diff --git a/src/sha1_ngcft1.cpp b/src/sha1_ngcft1.cpp index 7bc1a72..d4f1082 100644 --- a/src/sha1_ngcft1.cpp +++ b/src/sha1_ngcft1.cpp @@ -126,6 +126,11 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) { if (ce.all_of()) { msg.emplace_or_replace(ce.get()); } + if (ce.all_of()) { + msg.emplace_or_replace(); + } else { + msg.remove(); + } if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { msg.emplace_or_replace(); } @@ -169,7 +174,6 @@ void SHA1_NGCFT1::iterate(float delta) { // if we have not heard for 10sec, timeout if (it->second.time_since_activity >= 10.f) { - //std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n"; std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n"; it = peer_it->second.erase(it); } else { @@ -184,18 +188,30 @@ void SHA1_NGCFT1::iterate(float delta) { peer_it++; } } - //for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) { - //float& time_since_remove_activity = std::get(*it); - //time_since_remove_activity += delta; - //// if we have not heard for 10sec, timeout - //if (time_since_remove_activity >= 10.f) { - //std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n"; - //it = _transfers_sending_chunk.erase(it); - //} else { - //it++; - //} - //} + // receiving transfers + for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) { + for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { + it->second.time_since_activity += delta; + + // if we have not heard for 10sec, timeout + if (it->second.time_since_activity >= 10.f) { + std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n"; + // TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading + //it->second.v + it = peer_it->second.erase(it); + } else { + it++; + } + } + + if (peer_it->second.empty()) { + // cleanup unused peers too agressive? + peer_it = _receiving_transfers.erase(peer_it); + } else { + peer_it++; + } + } // queued requests for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { @@ -445,11 +461,64 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { return false; } - return false; + // TODO: make sure we requested this? + + if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { + SHA1Digest sha1_info_hash {e.file_id, e.file_id_size}; + if (!_info_to_content.count(sha1_info_hash)) { + // no idea about this content + return false; + } + + auto ce = _info_to_content.at(sha1_info_hash); + + if (ce.any_of()) { + // we already have the info (should) + return false; + } + + // TODO: check if e.file_size too large / ask for permission + if (e.file_size > 100*1024*1024) { + // a info size of 100MiB is ~640GiB for a 128KiB chunk size (default) + return false; + } + + _receiving_transfers + [combineIds(e.group_number, e.peer_number)] + [e.transfer_id] + .v = ReceivingTransfer::Info{ce, std::vector(e.file_size)}; + + e.accept = true; + } else { + // TODO assert + return false; + } + + return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { - return false; + if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) { + return false; + } + + auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number)); + if (!peer_transfers.count(e.transfer_id)) { + return false; + } + + auto& tv = peer_transfers[e.transfer_id].v; + if (std::holds_alternative(tv)) { + auto& info_data = std::get(tv).info_data; + for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) { + info_data[i+e.data_offset] = e.data[i]; + } + } else { + // TODO: handle chunks + // error/ assert + } + + return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { @@ -502,7 +571,65 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { - return false; + if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) { + return false; + } + + auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number)); + if (!peer_transfers.count(e.transfer_id)) { + return false; + } + + const auto& tv = peer_transfers[e.transfer_id].v; + if (std::holds_alternative(tv)) { + auto& info = std::get(tv); + auto ce = info.content; + + if (ce.any_of()) { + // we already have the info, discard + peer_transfers.erase(e.transfer_id); + return true; + } + + // check if data matches hash + auto hash = hash_sha1(info.info_data.data(), info.info_data.size()); + + assert(ce.all_of()); + if (ce.get().hash != hash) { + std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n"; + // requeue info request + peer_transfers.erase(e.transfer_id); + return true; + } + + const auto& info_data = ce.emplace_or_replace(std::move(info.info_data)).data; + auto& ft_info = ce.emplace_or_replace(); + ft_info.fromBuffer(info_data); + + { // file info + // TODO: not overwrite fi? since same? + auto& file_info = ce.emplace_or_replace(); + file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; + file_info.total_size = ft_info.file_size; + } + + std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n"; + + ce.remove(); + if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) { + _queue_content_want_info.erase(it); + } + + ce.emplace_or_replace(); + + updateMessages(ce); + } else if (std::holds_alternative(tv)) { + updateMessages(std::get(tv).content); // mostly for received bytes + } + + peer_transfers.erase(e.transfer_id); + + return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {