sending chunks working

This commit is contained in:
Green Sky 2023-08-09 23:02:29 +02:00
parent ae714fcedc
commit 853a54a9b5
No known key found for this signature in database
3 changed files with 261 additions and 5 deletions

View File

@ -100,6 +100,7 @@ SOLANA_PLUGIN_EXPORT void solana_plugin_tick(float delta) {
//std::cout << "PLUGIN NGCEXT TICK()\n";
g_ngcft1->iterate(delta);
g_sha1_ngcft1->iterate(delta);
}
} // extern C

View File

@ -26,8 +26,65 @@ namespace Components {
std::vector<uint8_t> hash;
};
struct FT1ChunkSHA1Cache {
std::vector<bool> have_chunk;
bool have_all {false};
size_t have_count {0};
entt::dense_map<SHA1Digest, size_t> chunk_hash_to_index;
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
size_t chunkSize(size_t chunk_index) const;
bool haveChunk(const SHA1Digest& hash) const;
};
} // Components
std::optional<size_t> Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const {
const auto it = chunk_hash_to_index.find(hash);
if (it != chunk_hash_to_index.cend()) {
return it->second;
} else {
return std::nullopt;
}
}
bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
if (have_all) { // short cut
return true;
}
if (auto i_opt = chunkIndex(hash); i_opt.has_value()) {
return have_chunk[i_opt.value()];
}
// not part of this file
return false;
}
static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
if (chunk_index+1 == sha1_info.chunks.size()) {
// last chunk
return sha1_info.file_size - chunk_index * sha1_info.chunk_size;
} else {
return sha1_info.chunk_size;
}
}
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, Message3Handle msg, const SHA1Digest& hash) {
// TODO: transfers
for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) {
// if already in queue
if (i_g == group_number && i_p == peer_number && i_h == hash) {
// update timer
i_t = 0.f;
return;
}
}
// not in queue yet
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, msg, hash, 0.f));
}
uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t peer_number) {
return (uint64_t(group_number) << 32) | peer_number;
}
@ -55,6 +112,118 @@ SHA1_NGCFT1::SHA1_NGCFT1(
_rmm.subscribe(this, RegistryMessageModel_Event::send_file_path);
}
void SHA1_NGCFT1::iterate(float delta) {
{ // timers
// chunk sending
for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 10sec, timeout
if (it->second.time_since_activity >= 10.f) {
//std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << "." << int(it->first) << "\n";
it = peer_it->second.erase(it);
} else {
it++;
}
}
if (peer_it->second.empty()) {
// cleanup unused peers too agressive?
peer_it = _sending_transfers.erase(peer_it);
} else {
peer_it++;
}
}
//for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) {
//float& time_since_remove_activity = std::get<float>(*it);
//time_since_remove_activity += delta;
//// if we have not heard for 10sec, timeout
//if (time_since_remove_activity >= 10.f) {
//std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
//it = _transfers_sending_chunk.erase(it);
//} else {
//it++;
//}
//}
// queued requests
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
float& timer = std::get<float>(*it);
timer += delta;
if (timer >= 10.f) {
it = _queue_requested_chunk.erase(it);
} else {
it++;
}
}
}
// if we have not reached the total cap for transfers
// count running transfers
size_t running_transfer_count {0};
for (const auto& [_, transfers] : _sending_transfers) {
running_transfer_count += transfers.size();
}
if (running_transfer_count < _max_concurrent_out) {
// for each peer? transfer cap per peer?
#if 0
// first check requests for info
if (!_queue_requested_info.empty()) {
// send init to _queue_requested_info
const auto [group_number, peer_number] = _queue_requested_info.front();
if (_tcl.getGroupPeerConnectionStatus(group_number, peer_number) != TOX_CONNECTION_NONE) {
uint8_t transfer_id {0};
if (_tcl.sendFT1InitPrivate(
group_number, peer_number,
NGC_FT1_file_kind::HASH_SHA1_INFO,
_sha1_info_hash.data.data(), _sha1_info_hash.size(), // id (info hash)
_sha1_info_data.size(), // "file_size"
transfer_id
)) {
_transfers_requested_info.push_back({
group_number, peer_number,
transfer_id,
0.f
});
_queue_requested_info.pop_front();
}
}
} else
#endif
if (!_queue_requested_chunk.empty()) { // then check for chunk requests
const auto [group_number, peer_number, msg, chunk_hash, _] = _queue_requested_chunk.front();
auto chunk_idx_opt = msg.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash);
if (chunk_idx_opt.has_value()) {
const auto& info = msg.get<Components::FT1InfoSHA1>();
uint8_t transfer_id {0};
if (_nft.NGC_FT1_send_init_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
chunk_hash.data.data(), chunk_hash.size(),
chunkSize(info, chunk_idx_opt.value()),
&transfer_id
)) {
_sending_transfers
[combineIds(group_number, peer_number)]
[transfer_id] // TODO: also save index?
.v = SendingTransfer::Chunk{msg, chunk_idx_opt.value() * info.chunk_size};
}
}
// remove from queue regardless
_queue_requested_chunk.pop_front();
}
}
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
// only interested in sha1
if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) {
@ -79,6 +248,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
assert(msg.all_of<Components::FT1InfoSHA1Data>());
// TODO: queue instead
//queueUpRequestInfo(e.group_number, e.peer_number, info_hash);
uint8_t transfer_id {0};
_nft.NGC_FT1_send_init_private(
e.group_number, e.peer_number,
@ -92,9 +263,35 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
[combineIds(e.group_number, e.peer_number)]
[transfer_id]
.v = SendingTransfer::Info{msg.get<Components::FT1InfoSHA1Data>().data};
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
if (e.file_id_size != 20) {
// error
return false;
}
SHA1Digest chunk_hash{e.file_id, e.file_id_size};
if (!_chunks.count(chunk_hash)) {
// we dont know about this
return false;
}
auto msg = _chunks.at(chunk_hash);
assert(msg.all_of<Components::FT1ChunkSHA1Cache>());
if (!msg.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
// we dont have the chunk
return false;
}
// queue good request
queueUpRequestChunk(e.group_number, e.peer_number, msg, chunk_hash);
} else {
assert(false && "unhandled case");
}
return false;
return true;
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
@ -132,8 +329,28 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
// was last read (probably TODO: add transfer destruction event)
peer.erase(e.transfer_id);
}
} else if (std::holds_alternative<SendingTransfer::Chunk>(transfer.v)) {
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
const auto data = chunk_transfer.msg.get<Message::Components::Transfer::File>()->read(chunk_transfer.offset_into_file + e.data_offset, e.data_size);
// TODO: optimize
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
e.data[i] = data[i];
}
chunk_transfer.msg.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size();
//_rmm.throwEventUpdate(transfer); // should we?
//if (e.data_offset + e.data_size >= *insert chunk size here*) {
//// was last read (probably TODO: add transfer destruction event)
//peer.erase(e.transfer_id);
//}
} else {
assert(false && "not implemented?");
}
transfer.time_since_activity = 0.f;
return true;
}
@ -210,7 +427,19 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
reg_ptr->emplace<Components::FT1InfoSHA1>(e, sha1_info);
reg_ptr->emplace<Components::FT1InfoSHA1Data>(e, sha1_info_data); // keep around? or file?
reg_ptr->emplace<Components::FT1InfoSHA1Hash>(e, sha1_info_hash);
_info_to_message[sha1_info_hash] = {*reg_ptr, e};
{ // lookup tables and have
auto& cc = reg_ptr->emplace<Components::FT1ChunkSHA1Cache>(e);
cc.have_all = true;
// skip have vec, since all
//cc.have_chunk
cc.have_count = sha1_info.chunks.size(); // need?
_info_to_message[sha1_info_hash] = {*reg_ptr, e};
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
_chunks[sha1_info.chunks[i]] = {*reg_ptr, e};
cc.chunk_hash_to_index[sha1_info.chunks[i]] = i;
}
}
//reg_ptr->emplace<Message::Components::Transfer::FileKind>(e, file_kind);
// file id would be sha1_info hash or something

View File

@ -9,7 +9,8 @@
#include "./ngcft1.hpp"
#include "./ft1_sha1_info.hpp"
#include "entt/container/dense_map.hpp"
#include <entt/container/dense_map.hpp>
#include <variant>
@ -19,9 +20,19 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
NGCFT1& _nft;
ToxContactModel2& _tcm;
// limit this to each group?
entt::dense_map<SHA1Digest, Message3Handle> _info_to_message;
// sha1 chunk index
// TODO: optimize lookup
entt::dense_map<SHA1Digest, Message3Handle> _chunks;
// group_number, peer_number, message, chunk_hash, timer
std::deque<std::tuple<uint32_t, uint32_t, Message3Handle, SHA1Digest, float>> _queue_requested_chunk;
//void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, Message3Handle msg, const SHA1Digest& hash);
struct SendingTransfer {
struct Info {
// copy of info data
@ -29,13 +40,28 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
std::vector<uint8_t> info_data;
};
std::variant<Info> v;
struct Chunk {
Message3Handle msg;
uint64_t offset_into_file;
// or data?
// if memmapped, this would be just a pointer
};
std::variant<Info, Chunk> v;
float time_since_activity {0.f};
};
// key is groupid + peerid
entt::dense_map<uint64_t, entt::dense_map<uint8_t, SendingTransfer>> _sending_transfers;
static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number);
public: // TODO: config
bool _udp_only {false};
size_t _max_concurrent_in {8};
size_t _max_concurrent_out {4};
public:
SHA1_NGCFT1(
Contact3Registry& cr,
@ -44,7 +70,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
ToxContactModel2& tcm
);
//void iterate(float delta);
void iterate(float delta);
protected: // events
bool onEvent(const Events::NGCFT1_recv_request&) override;