receiving transfers refactor
This commit is contained in:
parent
3286a7228c
commit
33560f8f8a
@ -62,6 +62,9 @@ add_library(solanaceae_sha1_ngcft1
|
|||||||
./solanaceae/ngc_ft1_sha1/participation.hpp
|
./solanaceae/ngc_ft1_sha1/participation.hpp
|
||||||
./solanaceae/ngc_ft1_sha1/participation.cpp
|
./solanaceae/ngc_ft1_sha1/participation.cpp
|
||||||
|
|
||||||
|
./solanaceae/ngc_ft1_sha1/receiving_transfers.hpp
|
||||||
|
./solanaceae/ngc_ft1_sha1/receiving_transfers.cpp
|
||||||
|
|
||||||
./solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp
|
./solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp
|
||||||
./solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp
|
./solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp
|
||||||
)
|
)
|
||||||
|
122
solanaceae/ngc_ft1_sha1/receiving_transfers.cpp
Normal file
122
solanaceae/ngc_ft1_sha1/receiving_transfers.cpp
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
#include "./receiving_transfers.hpp"
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
void ReceivingTransfers::tick(float delta) {
|
||||||
|
for (auto peer_it = _data.begin(); peer_it != _data.end();) {
|
||||||
|
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
|
||||||
|
it->second.time_since_activity += delta;
|
||||||
|
|
||||||
|
// if we have not heard for 20sec, timeout
|
||||||
|
if (it->second.time_since_activity >= 20.f) {
|
||||||
|
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
|
||||||
|
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
|
||||||
|
//it->second.v
|
||||||
|
it = peer_it->second.erase(it);
|
||||||
|
} else {
|
||||||
|
it++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (peer_it->second.empty()) {
|
||||||
|
// cleanup unused peers too agressive?
|
||||||
|
peer_it = _data.erase(peer_it);
|
||||||
|
} else {
|
||||||
|
peer_it++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ReceivingTransfers::Entry& ReceivingTransfers::emplaceInfo(uint64_t combined_id, uint8_t transfer_id, const Entry::Info& info) {
|
||||||
|
auto& ent = _data[combined_id][transfer_id];
|
||||||
|
ent.v = info;
|
||||||
|
return ent;
|
||||||
|
}
|
||||||
|
|
||||||
|
ReceivingTransfers::Entry& ReceivingTransfers::emplaceChunk(uint64_t combined_id, uint8_t transfer_id, const Entry::Chunk& chunk) {
|
||||||
|
assert(!chunk.chunk_indices.empty());
|
||||||
|
assert(!containsPeerChunk(combined_id, chunk.content, chunk.chunk_indices.front()));
|
||||||
|
auto& ent = _data[combined_id][transfer_id];
|
||||||
|
ent.v = chunk;
|
||||||
|
return ent;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ReceivingTransfers::containsPeerTransfer(uint64_t combined_id, uint8_t transfer_id) const {
|
||||||
|
auto it = _data.find(combined_id);
|
||||||
|
if (it == _data.end()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return it->second.count(transfer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ReceivingTransfers::containsChunk(ObjectHandle o, size_t chunk_idx) const {
|
||||||
|
for (const auto& [_, p] : _data) {
|
||||||
|
for (const auto& [_2, v] : p) {
|
||||||
|
if (!v.isChunk()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto& c = v.getChunk();
|
||||||
|
if (c.content != o) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto idx : c.chunk_indices) {
|
||||||
|
if (idx == chunk_idx) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ReceivingTransfers::containsPeerChunk(uint64_t combined_id, ObjectHandle o, size_t chunk_idx) const {
|
||||||
|
auto it = _data.find(combined_id);
|
||||||
|
if (it == _data.end()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& [_, v] : it->second) {
|
||||||
|
if (!v.isChunk()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto& c = v.getChunk();
|
||||||
|
if (c.content != o) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto idx : c.chunk_indices) {
|
||||||
|
if (idx == chunk_idx) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReceivingTransfers::removePeer(uint64_t combined_id) {
|
||||||
|
_data.erase(combined_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReceivingTransfers::removePeerTransfer(uint64_t combined_id, uint8_t transfer_id) {
|
||||||
|
auto it = _data.find(combined_id);
|
||||||
|
if (it == _data.end()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
it->second.erase(transfer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t ReceivingTransfers::size(void) const {
|
||||||
|
size_t count {0};
|
||||||
|
for (const auto& [_, p] : _data) {
|
||||||
|
count += p.size();
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
63
solanaceae/ngc_ft1_sha1/receiving_transfers.hpp
Normal file
63
solanaceae/ngc_ft1_sha1/receiving_transfers.hpp
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <solanaceae/object_store/object_store.hpp>
|
||||||
|
|
||||||
|
#include <entt/container/dense_map.hpp>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <variant>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
struct ReceivingTransfers {
|
||||||
|
struct Entry {
|
||||||
|
struct Info {
|
||||||
|
ObjectHandle content;
|
||||||
|
// copy of info data
|
||||||
|
// too large?
|
||||||
|
std::vector<uint8_t> info_data;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Chunk {
|
||||||
|
ObjectHandle content;
|
||||||
|
std::vector<size_t> chunk_indices;
|
||||||
|
// or data?
|
||||||
|
// if memmapped, this would be just a pointer
|
||||||
|
};
|
||||||
|
|
||||||
|
std::variant<Info, Chunk> v;
|
||||||
|
|
||||||
|
float time_since_activity {0.f};
|
||||||
|
|
||||||
|
bool isInfo(void) const { return std::holds_alternative<Info>(v); }
|
||||||
|
bool isChunk(void) const { return std::holds_alternative<Chunk>(v); }
|
||||||
|
|
||||||
|
Info& getInfo(void) { return std::get<Info>(v); }
|
||||||
|
const Info& getInfo(void) const { return std::get<Info>(v); }
|
||||||
|
Chunk& getChunk(void) { return std::get<Chunk>(v); }
|
||||||
|
const Chunk& getChunk(void) const { return std::get<Chunk>(v); }
|
||||||
|
};
|
||||||
|
|
||||||
|
// key is groupid + peerid
|
||||||
|
// TODO: replace with contact
|
||||||
|
//using ReceivingTransfers = entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransferE>>;
|
||||||
|
entt::dense_map<uint64_t, entt::dense_map<uint8_t, Entry>> _data;
|
||||||
|
|
||||||
|
void tick(float delta);
|
||||||
|
|
||||||
|
Entry& emplaceInfo(uint64_t combined_id, uint8_t transfer_id, const Entry::Info& info);
|
||||||
|
Entry& emplaceChunk(uint64_t combined_id, uint8_t transfer_id, const Entry::Chunk& chunk);
|
||||||
|
|
||||||
|
bool containsPeer(uint64_t combined_id) const { return _data.count(combined_id); }
|
||||||
|
bool containsPeerTransfer(uint64_t combined_id, uint8_t transfer_id) const;
|
||||||
|
bool containsChunk(ObjectHandle o, size_t chunk_idx) const;
|
||||||
|
bool containsPeerChunk(uint64_t combined_id, ObjectHandle o, size_t chunk_idx) const;
|
||||||
|
|
||||||
|
auto& getPeer(uint64_t combined_id) { return _data.at(combined_id); }
|
||||||
|
auto& getTransfer(uint64_t combined_id, uint8_t transfer_id) { return getPeer(combined_id).at(transfer_id); }
|
||||||
|
|
||||||
|
void removePeer(uint64_t combined_id);
|
||||||
|
void removePeerTransfer(uint64_t combined_id, uint8_t transfer_id);
|
||||||
|
|
||||||
|
size_t size(void) const;
|
||||||
|
};
|
||||||
|
|
@ -256,28 +256,7 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// receiving transfers
|
// receiving transfers
|
||||||
for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) {
|
_receiving_transfers.tick(delta);
|
||||||
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
|
|
||||||
it->second.time_since_activity += delta;
|
|
||||||
|
|
||||||
// if we have not heard for 10sec, timeout
|
|
||||||
if (it->second.time_since_activity >= 20.f) {
|
|
||||||
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
|
|
||||||
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
|
|
||||||
//it->second.v
|
|
||||||
it = peer_it->second.erase(it);
|
|
||||||
} else {
|
|
||||||
it++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (peer_it->second.empty()) {
|
|
||||||
// cleanup unused peers too agressive?
|
|
||||||
peer_it = _receiving_transfers.erase(peer_it);
|
|
||||||
} else {
|
|
||||||
peer_it++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// queued requests
|
// queued requests
|
||||||
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
|
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
|
||||||
@ -330,10 +309,7 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
for (const auto& [_, transfers] : _sending_transfers) {
|
for (const auto& [_, transfers] : _sending_transfers) {
|
||||||
running_sending_transfer_count += transfers.size();
|
running_sending_transfer_count += transfers.size();
|
||||||
}
|
}
|
||||||
size_t running_receiving_transfer_count {0};
|
size_t running_receiving_transfer_count {_receiving_transfers.size()};
|
||||||
for (const auto& [_, transfers] : _receiving_transfers) {
|
|
||||||
running_receiving_transfer_count += transfers.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (running_sending_transfer_count < _max_concurrent_out) {
|
if (running_sending_transfer_count < _max_concurrent_out) {
|
||||||
// TODO: for each peer? transfer cap per peer?
|
// TODO: for each peer? transfer cap per peer?
|
||||||
@ -722,10 +698,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
_receiving_transfers
|
_receiving_transfers.emplaceInfo(
|
||||||
[combineIds(e.group_number, e.peer_number)]
|
combineIds(e.group_number, e.peer_number),
|
||||||
[e.transfer_id]
|
e.transfer_id,
|
||||||
.v = ReceivingTransfer::Info{ce, std::vector<uint8_t>(e.file_size)};
|
{ce, std::vector<uint8_t>(e.file_size)}
|
||||||
|
);
|
||||||
|
|
||||||
e.accept = true;
|
e.accept = true;
|
||||||
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
|
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
|
||||||
@ -765,10 +742,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
|||||||
// TODO: check e.file_size
|
// TODO: check e.file_size
|
||||||
assert(e.file_size == info.chunkSize(idx_vec.front()));
|
assert(e.file_size == info.chunkSize(idx_vec.front()));
|
||||||
|
|
||||||
_receiving_transfers
|
_receiving_transfers.emplaceChunk(
|
||||||
[combineIds(e.group_number, e.peer_number)]
|
combineIds(e.group_number, e.peer_number),
|
||||||
[e.transfer_id]
|
e.transfer_id,
|
||||||
.v = ReceivingTransfer::Chunk{ce, idx_vec};
|
ReceivingTransfers::Entry::Chunk{ce, idx_vec}
|
||||||
|
);
|
||||||
|
|
||||||
e.accept = true;
|
e.accept = true;
|
||||||
|
|
||||||
@ -781,31 +759,28 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||||
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
|
if (!_receiving_transfers.containsPeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
|
auto& transfer = _receiving_transfers.getTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
|
||||||
if (!peer_transfers.count(e.transfer_id)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto& tv = peer_transfers[e.transfer_id].v;
|
transfer.time_since_activity = 0.f;
|
||||||
peer_transfers[e.transfer_id].time_since_activity = 0.f;
|
if (transfer.isInfo()) {
|
||||||
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
|
auto& info_data = transfer.getInfo().info_data;
|
||||||
auto& info_data = std::get<ReceivingTransfer::Info>(tv).info_data;
|
|
||||||
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
|
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
|
||||||
info_data[i+e.data_offset] = e.data[i];
|
info_data[i+e.data_offset] = e.data[i];
|
||||||
}
|
}
|
||||||
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
} else if (transfer.isChunk()) {
|
||||||
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
|
auto o = transfer.getChunk().content;
|
||||||
|
|
||||||
assert(ce.all_of<Message::Components::Transfer::File>());
|
assert(o.all_of<Message::Components::Transfer::File>());
|
||||||
auto* file = ce.get<Message::Components::Transfer::File>().get();
|
auto* file = o.get<Message::Components::Transfer::File>().get();
|
||||||
assert(file != nullptr);
|
assert(file != nullptr);
|
||||||
|
|
||||||
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
const auto chunk_size = o.get<Components::FT1InfoSHA1>().chunk_size;
|
||||||
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
|
for (const auto chunk_index : transfer.getChunk().chunk_indices) {
|
||||||
|
const auto offset_into_file = chunk_index * chunk_size;
|
||||||
|
|
||||||
if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
|
if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
|
||||||
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
|
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
|
||||||
@ -871,72 +846,68 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||||
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
|
if (!_receiving_transfers.containsPeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
|
auto& transfer = _receiving_transfers.getTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
|
||||||
if (!peer_transfers.count(e.transfer_id)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto& tv = peer_transfers[e.transfer_id].v;
|
if (transfer.isInfo()) {
|
||||||
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
|
auto& info = transfer.getInfo();
|
||||||
auto& info = std::get<ReceivingTransfer::Info>(tv);
|
auto o = info.content;
|
||||||
auto ce = info.content;
|
|
||||||
|
|
||||||
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data>()) {
|
if (o.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data>()) {
|
||||||
// we already have the info, discard
|
// we already have the info, discard
|
||||||
peer_transfers.erase(e.transfer_id);
|
_receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if data matches hash
|
// check if data matches hash
|
||||||
auto hash = hash_sha1(info.info_data.data(), info.info_data.size());
|
auto hash = hash_sha1(info.info_data.data(), info.info_data.size());
|
||||||
|
|
||||||
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
|
assert(o.all_of<Components::FT1InfoSHA1Hash>());
|
||||||
if (ce.get<Components::FT1InfoSHA1Hash>().hash != hash) {
|
if (o.get<Components::FT1InfoSHA1Hash>().hash != hash) {
|
||||||
std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n";
|
std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n";
|
||||||
// requeue info request
|
// TODO: requeue info request; eg manipulate o.get<Components::ReRequestInfoTimer>();
|
||||||
peer_transfers.erase(e.transfer_id);
|
_receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto& info_data = ce.emplace_or_replace<Components::FT1InfoSHA1Data>(std::move(info.info_data)).data;
|
const auto& info_data = o.emplace_or_replace<Components::FT1InfoSHA1Data>(std::move(info.info_data)).data;
|
||||||
auto& ft_info = ce.emplace_or_replace<Components::FT1InfoSHA1>();
|
auto& ft_info = o.emplace_or_replace<Components::FT1InfoSHA1>();
|
||||||
ft_info.fromBuffer(info_data);
|
ft_info.fromBuffer(info_data);
|
||||||
|
|
||||||
{ // file info
|
{ // file info
|
||||||
// TODO: not overwrite fi? since same?
|
// TODO: not overwrite fi? since same?
|
||||||
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
|
auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>();
|
||||||
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
|
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
|
||||||
file_info.total_size = ft_info.file_size;
|
file_info.total_size = ft_info.file_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
|
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
|
||||||
|
|
||||||
ce.remove<Components::ReRequestInfoTimer>();
|
o.remove<Components::ReRequestInfoTimer>();
|
||||||
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
|
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) {
|
||||||
_queue_content_want_info.erase(it);
|
_queue_content_want_info.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
ce.emplace_or_replace<Message::Components::Transfer::TagPaused>();
|
o.emplace_or_replace<Message::Components::Transfer::TagPaused>();
|
||||||
|
|
||||||
updateMessages(ce);
|
updateMessages(o);
|
||||||
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
} else if (transfer.isChunk()) {
|
||||||
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
|
auto o = transfer.getChunk().content;
|
||||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
const auto& info = o.get<Components::FT1InfoSHA1>();
|
||||||
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
|
||||||
|
|
||||||
// HACK: only check first chunk (they *should* all be the same)
|
// HACK: only check first chunk (they *should* all be the same)
|
||||||
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.front();
|
const auto chunk_index = transfer.getChunk().chunk_indices.front();
|
||||||
const uint64_t offset_into_file = chunk_index * uint64_t(info.chunk_size);
|
const uint64_t offset_into_file = chunk_index * uint64_t(info.chunk_size);
|
||||||
|
|
||||||
assert(chunk_index < info.chunks.size());
|
assert(chunk_index < info.chunks.size());
|
||||||
const auto chunk_size = info.chunkSize(chunk_index);
|
const auto chunk_size = info.chunkSize(chunk_index);
|
||||||
assert(offset_into_file+chunk_size <= info.file_size);
|
assert(offset_into_file+chunk_size <= info.file_size);
|
||||||
|
|
||||||
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file);
|
const auto chunk_data = o.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file);
|
||||||
assert(!chunk_data.empty());
|
assert(!chunk_data.empty());
|
||||||
|
|
||||||
// check hash of chunk
|
// check hash of chunk
|
||||||
@ -945,7 +916,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
||||||
|
|
||||||
if (!cc.have_all) {
|
if (!cc.have_all) {
|
||||||
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) {
|
||||||
if (!cc.have_all && !cc.have_chunk[inner_chunk_index]) {
|
if (!cc.have_all && !cc.have_chunk[inner_chunk_index]) {
|
||||||
cc.have_chunk.set(inner_chunk_index);
|
cc.have_chunk.set(inner_chunk_index);
|
||||||
cc.have_count += 1;
|
cc.have_count += 1;
|
||||||
@ -962,33 +933,33 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
// HACK: remap file, to clear ram
|
// HACK: remap file, to clear ram
|
||||||
|
|
||||||
// TODO: error checking
|
// TODO: error checking
|
||||||
ce.get<Message::Components::Transfer::File>() = std::make_unique<File2RWMapped>(
|
o.get<Message::Components::Transfer::File>() = std::make_unique<File2RWMapped>(
|
||||||
ce.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
|
o.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
|
||||||
info.file_size
|
info.file_size
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// good chunk
|
// good chunk
|
||||||
// TODO: have wasted + metadata
|
// TODO: have wasted + metadata
|
||||||
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
|
o.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// queue chunk have for all participants
|
// queue chunk have for all participants
|
||||||
// HACK: send immediatly to all participants
|
// HACK: send immediatly to all participants
|
||||||
for (const auto c_part : ce.get<Components::SuspectedParticipants>().participants) {
|
for (const auto c_part : o.get<Components::SuspectedParticipants>().participants) {
|
||||||
if (!_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c_part)) {
|
if (!_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c_part)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto [part_group_number, part_peer_number] = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c_part);
|
const auto [part_group_number, part_peer_number] = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c_part);
|
||||||
|
|
||||||
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
|
const auto& info_hash = o.get<Components::FT1InfoSHA1Hash>().hash;
|
||||||
|
|
||||||
// convert size_t to uint32_t
|
// convert size_t to uint32_t
|
||||||
const std::vector<uint32_t> chunk_indices {
|
const std::vector<uint32_t> chunk_indices {
|
||||||
std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.cbegin(),
|
transfer.getChunk().chunk_indices.cbegin(),
|
||||||
std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.cend()
|
transfer.getChunk().chunk_indices.cend()
|
||||||
};
|
};
|
||||||
|
|
||||||
_neep.send_ft1_have(
|
_neep.send_ft1_have(
|
||||||
@ -1019,14 +990,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
|
|
||||||
// remove from requested
|
// remove from requested
|
||||||
// TODO: remove at init and track running transfers differently
|
// TODO: remove at init and track running transfers differently
|
||||||
for (const auto it : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
for (const auto it : transfer.getChunk().chunk_indices) {
|
||||||
ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
|
o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateMessages(ce); // mostly for received bytes
|
updateMessages(o); // mostly for received bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
peer_transfers.erase(e.transfer_id);
|
_receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1468,6 +1439,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: nfcft1 should have fired receive/send done events for all them running transfers
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||||
|
|
||||||
#include "./ft1_sha1_info.hpp"
|
#include "./ft1_sha1_info.hpp"
|
||||||
|
#include "./receiving_transfers.hpp"
|
||||||
|
|
||||||
#include <entt/entity/registry.hpp>
|
#include <entt/entity/registry.hpp>
|
||||||
#include <entt/entity/handle.hpp>
|
#include <entt/entity/handle.hpp>
|
||||||
@ -68,27 +69,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
|||||||
// key is groupid + peerid
|
// key is groupid + peerid
|
||||||
entt::dense_map<uint64_t, entt::dense_map<uint8_t, SendingTransfer>> _sending_transfers;
|
entt::dense_map<uint64_t, entt::dense_map<uint8_t, SendingTransfer>> _sending_transfers;
|
||||||
|
|
||||||
struct ReceivingTransfer {
|
ReceivingTransfers _receiving_transfers;
|
||||||
struct Info {
|
|
||||||
ObjectHandle content;
|
|
||||||
// copy of info data
|
|
||||||
// too large?
|
|
||||||
std::vector<uint8_t> info_data;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct Chunk {
|
|
||||||
ObjectHandle content;
|
|
||||||
std::vector<size_t> chunk_indices;
|
|
||||||
// or data?
|
|
||||||
// if memmapped, this would be just a pointer
|
|
||||||
};
|
|
||||||
|
|
||||||
std::variant<Info, Chunk> v;
|
|
||||||
|
|
||||||
float time_since_activity {0.f};
|
|
||||||
};
|
|
||||||
// key is groupid + peerid
|
|
||||||
entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers;
|
|
||||||
|
|
||||||
// makes request rotate around open content
|
// makes request rotate around open content
|
||||||
std::deque<ObjectHandle> _queue_content_want_info;
|
std::deque<ObjectHandle> _queue_content_want_info;
|
||||||
|
Loading…
Reference in New Issue
Block a user