refactor + wip chunk receive handling

This commit is contained in:
Green Sky 2023-08-16 01:43:12 +02:00
parent 80d0db546d
commit 8237f902c9
No known key found for this signature in database
5 changed files with 129 additions and 81 deletions

@ -1 +1 @@
Subproject commit d2d6cfbf5326de74cbac2dcd11be74df2cf79d2a
Subproject commit c73e429df138ce32f5cdb5588a3af8257a946680

View File

@ -28,6 +28,15 @@ std::ostream& operator<<(std::ostream& out, const SHA1Digest& v) {
return out;
}
size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
if (chunk_index+1 == chunks.size()) {
// last chunk
return file_size - chunk_index * chunk_size;
} else {
return chunk_size;
}
}
std::vector<uint8_t> FT1InfoSHA1::toBuffer(void) const {
std::vector<uint8_t> buffer;

View File

@ -46,6 +46,8 @@ struct FT1InfoSHA1 {
uint32_t chunk_size {128*1024}; // 128KiB for now
std::vector<SHA1Digest> chunks;
size_t chunkSize(size_t chunk_index) const;
std::vector<uint8_t> toBuffer(void) const;
void fromBuffer(const std::vector<uint8_t>& buffer);
};

View File

@ -49,7 +49,6 @@ namespace Components {
entt::dense_map<SHA1Digest, size_t> chunk_hash_to_index;
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
size_t chunkSize(size_t chunk_index) const;
bool haveChunk(const SHA1Digest& hash) const;
};
@ -139,6 +138,64 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
}
}
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) {
// get a list of peers we can request this file from
// TODO: randomly request from non SuspectedParticipants
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
// 1 in 20 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
if (tox_peers.empty() || (_rng()%20) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
const uint32_t group_number = tox_peers.back().first;
auto gch = _tcm.getContactGroup(group_number);
assert(static_cast<bool>(gch));
std::vector<uint32_t> un_tox_peers;
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
un_tox_peers.push_back(tgpe.peer_number);
}
}
if (un_tox_peers.empty()) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng()%un_tox_peers.size();
const auto peer_number = un_tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
}
}
} else {
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
}
return std::nullopt;
}
SHA1_NGCFT1::SHA1_NGCFT1(
Contact3Registry& cr,
RegistryMessageModel& rmm,
@ -295,68 +352,9 @@ void SHA1_NGCFT1::iterate(float delta) {
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
// get a list of peers we can request this file from
// TODO: randomly request from non SuspectedParticipants
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
// 1 in 20 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
if (tox_peers.empty() || (_rng()%20) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
const uint32_t group_number = tox_peers.back().first;
auto gch = _tcm.getContactGroup(group_number);
assert(static_cast<bool>(gch));
std::vector<uint32_t> un_tox_peers;
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
un_tox_peers.push_back(tgpe.peer_number);
}
}
if (un_tox_peers.empty()) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng()%un_tox_peers.size();
const auto peer_number = un_tox_peers.at(sample_i);
//const auto& info = msg.get<Components::FT1InfoSHA1>();
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO),
info_hash.data(), info_hash.size()
);
ce.emplace<Components::ReRequestInfoTimer>(0.f);
_queue_content_want_info.pop_front();
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << " (rng)\n";
}
}
} else {
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
//const auto& info = msg.get<Components::FT1InfoSHA1>();
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
@ -373,6 +371,18 @@ void SHA1_NGCFT1::iterate(float delta) {
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
}
} else if (!_queue_content_want_chunk.empty()) {
const auto ce = _queue_content_want_chunk.front();
// select chunk/make sure we still need one
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
// ...
_queue_content_want_chunk.pop_front();
}
}
}
}
@ -439,7 +449,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
}
assert(msg.all_of<Components::FT1ChunkSHA1Cache>());
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
if (!ce.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
// we dont have the chunk
@ -489,9 +499,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
.v = ReceivingTransfer::Info{ce, std::vector<uint8_t>(e.file_size)};
e.accept = true;
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
} else {
// TODO assert
return false;
assert(false && "unhandled case");
}
return true;
@ -513,9 +523,19 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
info_data[i+e.data_offset] = e.data[i];
}
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
const auto offset_into_file = std::get<ReceivingTransfer::Chunk>(tv).offset_into_file;
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
assert(ce.all_of<Message::Components::Transfer::File>());
auto* file = ce.get<Message::Components::Transfer::File>().get();
assert(file != nullptr);
// TODO: avoid temporary copy
// TODO: check return
file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size});
} else {
// TODO: handle chunks
// error/ assert
assert(false && "unhandled case");
}
return true;
@ -624,7 +644,32 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
updateMessages(ce);
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
updateMessages(std::get<ReceivingTransfer::Chunk>(tv).content); // mostly for received bytes
const auto offset_into_file = std::get<ReceivingTransfer::Chunk>(tv).offset_into_file;
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
const auto& info = ce.get<Components::FT1InfoSHA1>();
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
const auto chunk_index = offset_into_file/info.chunk_size;
assert(chunk_index < info.chunks.size());
const auto chunk_size = info.chunkSize(chunk_index);
assert(offset_into_file+chunk_size <= info.file_size);
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(offset_into_file, chunk_size);
// check hash of chunk
auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size());
if (info.chunks.at(chunk_index) == got_hash) {
// TODO: check for have all
cc.have_chunk.at(chunk_index) = true;
// good chunk
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size();
} else {
// bad chunk
// TODO: requeue?
}
updateMessages(ce); // mostly for received bytes
}
peer_transfers.erase(e.transfer_id);
@ -934,6 +979,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// file id would be sha1_info hash or something
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
// remove? done in updateMessages() anyway
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(msg_e, ce.get<Message::Components::Transfer::FileInfo>());
}
@ -947,17 +993,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// TODO: determine if this is true
//reg_ptr->emplace<Message::Components::Transfer::TagPaused>(e);
#if 0
const auto friend_number = _cr.get<Contact::Components::ToxFriendEphemeral>(c).friend_number;
const auto&& [transfer_id, err] = _t.toxFileSend(friend_number, file_kind, file_impl->_file_size, file_id, file_name);
if (err == TOX_ERR_FILE_SEND_OK) {
reg_ptr->emplace<Message::Components::Transfer::ToxTransferFriend>(e, friend_number, transfer_id.value());
// TODO: add tag signifying init sent status?
toxFriendLookupAdd({*reg_ptr, e});
} // else queue?
#endif
if (_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
const uint32_t group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
uint32_t message_id = 0;

View File

@ -95,6 +95,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
void updateMessages(ContentHandle ce);
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ContentHandle ce);
public: // TODO: config
bool _udp_only {false};