refactor sending transfers the same way as receiving

This commit is contained in:
Green Sky 2024-07-13 13:52:43 +02:00
parent ca89e43a40
commit bc5599a230
No known key found for this signature in database
5 changed files with 241 additions and 130 deletions

View File

@ -73,6 +73,9 @@ add_library(solanaceae_sha1_ngcft1
./solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp
./solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp
./solanaceae/ngc_ft1_sha1/sending_transfers.hpp
./solanaceae/ngc_ft1_sha1/sending_transfers.cpp
./solanaceae/ngc_ft1_sha1/receiving_transfers.hpp
./solanaceae/ngc_ft1_sha1/receiving_transfers.cpp

View File

@ -0,0 +1,126 @@
#include "./sending_transfers.hpp"
#include <iostream>
#include <cassert>
void SendingTransfers::tick(float delta) {
for (auto peer_it = _data.begin(); peer_it != _data.end();) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 2min, timeout (lower level event on real timeout)
// TODO: do we really need this if we get events?
if (it->second.time_since_activity >= 120.f) {
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
assert(false);
it = peer_it->second.erase(it);
} else {
it++;
}
}
if (peer_it->second.empty()) {
// cleanup unused peers too agressive?
peer_it = _data.erase(peer_it);
} else {
peer_it++;
}
}
}
SendingTransfers::Entry& SendingTransfers::emplaceInfo(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Info& info) {
auto& ent = _data[combine_ids(group_number, peer_number)][transfer_id];
ent.v = info;
return ent;
}
SendingTransfers::Entry& SendingTransfers::emplaceChunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Chunk& chunk) {
assert(!containsPeerChunk(group_number, peer_number, chunk.content, chunk.chunk_index));
auto& ent = _data[combine_ids(group_number, peer_number)][transfer_id];
ent.v = chunk;
return ent;
}
bool SendingTransfers::containsPeerTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) const {
auto it = _data.find(combine_ids(group_number, peer_number));
if (it == _data.end()) {
return false;
}
return it->second.count(transfer_id);
}
bool SendingTransfers::containsChunk(ObjectHandle o, size_t chunk_idx) const {
for (const auto& [_, p] : _data) {
for (const auto& [_2, v] : p) {
if (!v.isChunk()) {
continue;
}
const auto& c = v.getChunk();
if (c.content != o) {
continue;
}
if (c.chunk_index == chunk_idx) {
return true;
}
}
}
return false;
}
bool SendingTransfers::containsPeerChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle o, size_t chunk_idx) const {
auto it = _data.find(combine_ids(group_number, peer_number));
if (it == _data.end()) {
return false;
}
for (const auto& [_, v] : it->second) {
if (!v.isChunk()) {
continue;
}
const auto& c = v.getChunk();
if (c.content != o) {
continue;
}
if (c.chunk_index == chunk_idx) {
return true;
}
}
return false;
}
void SendingTransfers::removePeer(uint32_t group_number, uint32_t peer_number) {
_data.erase(combine_ids(group_number, peer_number));
}
void SendingTransfers::removePeerTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) {
auto it = _data.find(combine_ids(group_number, peer_number));
if (it == _data.end()) {
return;
}
it->second.erase(transfer_id);
}
size_t SendingTransfers::size(void) const {
size_t count {0};
for (const auto& [_, p] : _data) {
count += p.size();
}
return count;
}
size_t SendingTransfers::sizePeer(uint32_t group_number, uint32_t peer_number) const {
auto it = _data.find(combine_ids(group_number, peer_number));
if (it == _data.end()) {
return 0;
}
return it->second.size();
}

View File

@ -0,0 +1,67 @@
#pragma once
#include <solanaceae/object_store/object_store.hpp>
#include <entt/container/dense_map.hpp>
#include "./util.hpp"
#include <cstdint>
#include <variant>
#include <vector>
struct SendingTransfers {
struct Entry {
struct Info {
// copy of info data
// too large?
std::vector<uint8_t> info_data;
};
struct Chunk {
ObjectHandle content;
size_t chunk_index; // <.< remove offset_into_file
//uint64_t offset_into_file;
// or data?
// if memmapped, this would be just a pointer
};
std::variant<Info, Chunk> v;
float time_since_activity {0.f};
bool isInfo(void) const { return std::holds_alternative<Info>(v); }
bool isChunk(void) const { return std::holds_alternative<Chunk>(v); }
Info& getInfo(void) { return std::get<Info>(v); }
const Info& getInfo(void) const { return std::get<Info>(v); }
Chunk& getChunk(void) { return std::get<Chunk>(v); }
const Chunk& getChunk(void) const { return std::get<Chunk>(v); }
};
// key is groupid + peerid
// TODO: replace with contact
entt::dense_map<uint64_t, entt::dense_map<uint8_t, Entry>> _data;
void tick(float delta);
Entry& emplaceInfo(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Info& info);
Entry& emplaceChunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Chunk& chunk);
bool containsPeer(uint32_t group_number, uint32_t peer_number) const { return _data.count(combine_ids(group_number, peer_number)); }
bool containsPeerTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) const;
// less reliable, since we dont keep the list of chunk idecies
bool containsChunk(ObjectHandle o, size_t chunk_idx) const;
bool containsPeerChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle o, size_t chunk_idx) const;
auto& getPeer(uint32_t group_number, uint32_t peer_number) { return _data.at(combine_ids(group_number, peer_number)); }
auto& getTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) { return getPeer(group_number, peer_number).at(transfer_id); }
void removePeer(uint32_t group_number, uint32_t peer_number);
void removePeerTransfer(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id);
size_t size(void) const;
size_t sizePeer(uint32_t group_number, uint32_t peer_number) const;
};

View File

@ -48,7 +48,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
}
}
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) {
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle obj, const SHA1Digest& hash) {
for (auto& [i_g, i_p, i_o, i_h, i_t] : _queue_requested_chunk) {
// if already in queue
if (i_g == group_number && i_p == peer_number && i_h == hash) {
@ -59,33 +59,17 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe
}
// check for running transfer
if (_sending_transfers.count(combine_ids(group_number, peer_number))) {
for (const auto& [_, transfer] : _sending_transfers.at(combine_ids(group_number, peer_number))) {
if (std::holds_alternative<SendingTransfer::Info>(transfer.v)) {
// ignore info
continue;
}
const auto& t_c = std::get<SendingTransfer::Chunk>(transfer.v);
if (content != t_c.content) {
// ignore different content
continue;
}
auto chunk_idx_vec = content.get<Components::FT1ChunkSHA1Cache>().chunkIndices(hash);
for (size_t idx : chunk_idx_vec) {
if (idx == t_c.chunk_index) {
// already sending
return; // skip
}
}
auto chunk_idx_vec = obj.get<Components::FT1ChunkSHA1Cache>().chunkIndices(hash);
// list is 1 entry in 99% of cases
for (const size_t chunk_idx : chunk_idx_vec) {
if (_sending_transfers.containsPeerChunk(group_number, peer_number, obj, chunk_idx)) {
// already sending
return; // skip
}
}
// not in queue yet
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f));
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, obj, hash, 0.f));
}
void SHA1_NGCFT1::updateMessages(ObjectHandle ce) {
@ -253,28 +237,7 @@ float SHA1_NGCFT1::iterate(float delta) {
{ // timers
// sending transfers
for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 2min, timeout (lower level event on real timeout)
// TODO: do we really need this if we get events?
if (it->second.time_since_activity >= 120.f) {
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
assert(false);
it = peer_it->second.erase(it);
} else {
it++;
}
}
if (peer_it->second.empty()) {
// cleanup unused peers too agressive?
peer_it = _sending_transfers.erase(peer_it);
} else {
peer_it++;
}
}
_sending_transfers.tick(delta);
// receiving transfers
_receiving_transfers.tick(delta);
@ -378,10 +341,7 @@ float SHA1_NGCFT1::iterate(float delta) {
// if we have not reached the total cap for transfers
// count running transfers
size_t running_sending_transfer_count {0};
for (const auto& [_, transfers] : _sending_transfers) {
running_sending_transfer_count += transfers.size();
}
size_t running_sending_transfer_count {_sending_transfers.size()};
size_t running_receiving_transfer_count {_receiving_transfers.size()};
if (running_sending_transfer_count < _max_concurrent_out) {
@ -394,21 +354,7 @@ float SHA1_NGCFT1::iterate(float delta) {
if (!chunk_idx_vec.empty()) {
// check if already sending
bool already_sending_to_this_peer = false;
if (_sending_transfers.count(combine_ids(group_number, peer_number))) {
for (const auto& [_2, t] : _sending_transfers.at(combine_ids(group_number, peer_number))) {
if (std::holds_alternative<SendingTransfer::Chunk>(t.v)) {
const auto& v = std::get<SendingTransfer::Chunk>(t.v);
if (v.content == ce && v.chunk_index == chunk_idx_vec.front()) {
// already sending
already_sending_to_this_peer = true;
break;
}
}
}
}
if (!already_sending_to_this_peer) {
if (!_sending_transfers.containsPeerChunk(group_number, peer_number, ce, chunk_idx_vec.front())) {
const auto& info = ce.get<Components::FT1InfoSHA1>();
uint8_t transfer_id {0};
@ -419,10 +365,14 @@ float SHA1_NGCFT1::iterate(float delta) {
chunkSize(info, chunk_idx_vec.front()),
&transfer_id
)) {
_sending_transfers
[combine_ids(group_number, peer_number)]
[transfer_id] // TODO: also save index?
.v = SendingTransfer::Chunk{ce, chunk_idx_vec.front()};
_sending_transfers.emplaceChunk(
group_number, peer_number,
transfer_id,
SendingTransfers::Entry::Chunk{
ce,
chunk_idx_vec.front()
}
);
}
} // else just remove from queue
}
@ -649,9 +599,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
return false;
}
auto content = _info_to_content.at(info_hash);
auto o = _info_to_content.at(info_hash);
if (!content.all_of<Components::FT1InfoSHA1Data>()) {
if (!o.all_of<Components::FT1InfoSHA1Data>()) {
// we dont have the info for that infohash (yet?)
return false;
}
@ -663,14 +613,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
e.group_number, e.peer_number,
static_cast<uint32_t>(e.file_kind),
e.file_id, e.file_id_size,
content.get<Components::FT1InfoSHA1Data>().data.size(),
o.get<Components::FT1InfoSHA1Data>().data.size(),
&transfer_id
);
_sending_transfers
[combine_ids(e.group_number, e.peer_number)]
[transfer_id]
.v = SendingTransfer::Info{content.get<Components::FT1InfoSHA1Data>().data};
_sending_transfers.emplaceInfo(
e.group_number, e.peer_number,
transfer_id,
SendingTransfers::Entry::Info{
o.get<Components::FT1InfoSHA1Data>().data
}
);
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
@ -872,30 +825,20 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
if (!_sending_transfers.count(combine_ids(e.group_number, e.peer_number))) {
if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
return false;
}
auto& peer = _sending_transfers.at(combine_ids(e.group_number, e.peer_number));
if (!peer.count(e.transfer_id)) {
return false;
}
auto& transfer = peer.at(e.transfer_id);
auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id);
transfer.time_since_activity = 0.f;
if (std::holds_alternative<SendingTransfer::Info>(transfer.v)) {
auto& info_transfer = std::get<SendingTransfer::Info>(transfer.v);
if (transfer.isInfo()) {
auto& info_transfer = transfer.getInfo();
for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) {
e.data[i] = info_transfer.info_data[i + e.data_offset];
}
if (e.data_offset + e.data_size >= info_transfer.info_data.size()) {
// was last read (probably TODO: add transfer destruction event)
peer.erase(e.transfer_id);
}
} else if (std::holds_alternative<SendingTransfer::Chunk>(transfer.v)) {
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
} else if (transfer.isChunk()) {
auto& chunk_transfer = transfer.getChunk();
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
// TODO: should we really use file?
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(
@ -912,11 +855,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
// TODO: add event to propergate to messages
//_rmm.throwEventUpdate(transfer); // should we?
//if (e.data_offset + e.data_size >= *insert chunk size here*) {
//// was last read (probably TODO: add transfer destruction event)
//peer.erase(e.transfer_id);
//}
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (static_cast<bool>(c)) {
chunk_transfer.content.get_or_emplace<Components::TransferStatsTally>()
@ -1102,20 +1040,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
if (!_sending_transfers.count(combine_ids(e.group_number, e.peer_number))) {
if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
return false;
}
auto& peer_transfers = _sending_transfers.at(combine_ids(e.group_number, e.peer_number));
if (!peer_transfers.count(e.transfer_id)) {
return false;
auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id);
if (transfer.isChunk()) {
updateMessages(transfer.getChunk().content); // mostly for sent bytes
}
const auto& tv = peer_transfers[e.transfer_id].v;
if (std::holds_alternative<SendingTransfer::Chunk>(tv)) {
updateMessages(std::get<SendingTransfer::Chunk>(tv).content); // mostly for sent bytes
}
peer_transfers.erase(e.transfer_id);
_sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id);
return true;
}

View File

@ -11,6 +11,7 @@
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include "./ft1_sha1_info.hpp"
#include "./sending_transfers.hpp"
#include "./receiving_transfers.hpp"
#include <entt/entity/registry.hpp>
@ -54,28 +55,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
//void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash);
struct SendingTransfer {
struct Info {
// copy of info data
// too large?
std::vector<uint8_t> info_data;
};
struct Chunk {
ObjectHandle content;
size_t chunk_index; // <.< remove offset_into_file
//uint64_t offset_into_file;
// or data?
// if memmapped, this would be just a pointer
};
std::variant<Info, Chunk> v;
float time_since_activity {0.f};
};
// key is groupid + peerid
entt::dense_map<uint64_t, entt::dense_map<uint8_t, SendingTransfer>> _sending_transfers;
SendingTransfers _sending_transfers;
ReceivingTransfers _receiving_transfers;
// makes request rotate around open content
@ -87,7 +67,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
};
std::deque<QBitsetEntry> _queue_send_bitset;
// workaround missing contact events
// FIXME: workaround missing contact events
// only used to remove participation on peer exit
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;