receive info
This commit is contained in:
parent
4895a3c03a
commit
7812e3a413
@ -560,6 +560,19 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
||||
sendPKG_FT1_DATA_ACK(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size());
|
||||
}
|
||||
|
||||
|
||||
if (transfer.file_size_current == transfer.file_size) {
|
||||
// TODO: set all data received, and clean up
|
||||
//transfer.state = Group::Peer::RecvTransfer::State::RECV;
|
||||
dispatch(
|
||||
NGCFT1_Event::recv_done,
|
||||
Events::NGCFT1_recv_done{
|
||||
e.group_number, e.peer_number,
|
||||
e.transfer_id
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -126,6 +126,11 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
|
||||
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
|
||||
msg.emplace_or_replace<Message::Components::Transfer::BytesSent>(ce.get<Message::Components::Transfer::BytesSent>());
|
||||
}
|
||||
if (ce.all_of<Message::Components::Transfer::TagPaused>()) {
|
||||
msg.emplace_or_replace<Message::Components::Transfer::TagPaused>();
|
||||
} else {
|
||||
msg.remove<Message::Components::Transfer::TagPaused>();
|
||||
}
|
||||
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
|
||||
msg.emplace_or_replace<Message::Components::Transfer::TagHaveAll>();
|
||||
}
|
||||
@ -169,7 +174,6 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
|
||||
// if we have not heard for 10sec, timeout
|
||||
if (it->second.time_since_activity >= 10.f) {
|
||||
//std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
|
||||
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
|
||||
it = peer_it->second.erase(it);
|
||||
} else {
|
||||
@ -184,18 +188,30 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
peer_it++;
|
||||
}
|
||||
}
|
||||
//for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) {
|
||||
//float& time_since_remove_activity = std::get<float>(*it);
|
||||
//time_since_remove_activity += delta;
|
||||
|
||||
//// if we have not heard for 10sec, timeout
|
||||
//if (time_since_remove_activity >= 10.f) {
|
||||
//std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
|
||||
//it = _transfers_sending_chunk.erase(it);
|
||||
//} else {
|
||||
//it++;
|
||||
//}
|
||||
//}
|
||||
// receiving transfers
|
||||
for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) {
|
||||
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
|
||||
it->second.time_since_activity += delta;
|
||||
|
||||
// if we have not heard for 10sec, timeout
|
||||
if (it->second.time_since_activity >= 10.f) {
|
||||
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
|
||||
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
|
||||
//it->second.v
|
||||
it = peer_it->second.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
|
||||
if (peer_it->second.empty()) {
|
||||
// cleanup unused peers too agressive?
|
||||
peer_it = _receiving_transfers.erase(peer_it);
|
||||
} else {
|
||||
peer_it++;
|
||||
}
|
||||
}
|
||||
|
||||
// queued requests
|
||||
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
|
||||
@ -445,11 +461,64 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: make sure we requested this?
|
||||
|
||||
if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) {
|
||||
SHA1Digest sha1_info_hash {e.file_id, e.file_id_size};
|
||||
if (!_info_to_content.count(sha1_info_hash)) {
|
||||
// no idea about this content
|
||||
return false;
|
||||
}
|
||||
|
||||
auto ce = _info_to_content.at(sha1_info_hash);
|
||||
|
||||
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data, Components::FT1ChunkSHA1Cache>()) {
|
||||
// we already have the info (should)
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: check if e.file_size too large / ask for permission
|
||||
if (e.file_size > 100*1024*1024) {
|
||||
// a info size of 100MiB is ~640GiB for a 128KiB chunk size (default)
|
||||
return false;
|
||||
}
|
||||
|
||||
_receiving_transfers
|
||||
[combineIds(e.group_number, e.peer_number)]
|
||||
[e.transfer_id]
|
||||
.v = ReceivingTransfer::Info{ce, std::vector<uint8_t>(e.file_size)};
|
||||
|
||||
e.accept = true;
|
||||
} else {
|
||||
// TODO assert
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
|
||||
if (!peer_transfers.count(e.transfer_id)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& tv = peer_transfers[e.transfer_id].v;
|
||||
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
|
||||
auto& info_data = std::get<ReceivingTransfer::Info>(tv).info_data;
|
||||
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
|
||||
info_data[i+e.data_offset] = e.data[i];
|
||||
}
|
||||
} else {
|
||||
// TODO: handle chunks
|
||||
// error/ assert
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
@ -502,7 +571,65 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
|
||||
if (!peer_transfers.count(e.transfer_id)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const auto& tv = peer_transfers[e.transfer_id].v;
|
||||
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
|
||||
auto& info = std::get<ReceivingTransfer::Info>(tv);
|
||||
auto ce = info.content;
|
||||
|
||||
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data>()) {
|
||||
// we already have the info, discard
|
||||
peer_transfers.erase(e.transfer_id);
|
||||
return true;
|
||||
}
|
||||
|
||||
// check if data matches hash
|
||||
auto hash = hash_sha1(info.info_data.data(), info.info_data.size());
|
||||
|
||||
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
|
||||
if (ce.get<Components::FT1InfoSHA1Hash>().hash != hash) {
|
||||
std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n";
|
||||
// requeue info request
|
||||
peer_transfers.erase(e.transfer_id);
|
||||
return true;
|
||||
}
|
||||
|
||||
const auto& info_data = ce.emplace_or_replace<Components::FT1InfoSHA1Data>(std::move(info.info_data)).data;
|
||||
auto& ft_info = ce.emplace_or_replace<Components::FT1InfoSHA1>();
|
||||
ft_info.fromBuffer(info_data);
|
||||
|
||||
{ // file info
|
||||
// TODO: not overwrite fi? since same?
|
||||
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
|
||||
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
|
||||
file_info.total_size = ft_info.file_size;
|
||||
}
|
||||
|
||||
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
|
||||
|
||||
ce.remove<Components::ReRequestInfoTimer>();
|
||||
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
|
||||
_queue_content_want_info.erase(it);
|
||||
}
|
||||
|
||||
ce.emplace_or_replace<Message::Components::Transfer::TagPaused>();
|
||||
|
||||
updateMessages(ce);
|
||||
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
||||
updateMessages(std::get<ReceivingTransfer::Chunk>(tv).content); // mostly for received bytes
|
||||
}
|
||||
|
||||
peer_transfers.erase(e.transfer_id);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
|
||||
|
Loading…
Reference in New Issue
Block a user