Compare commits
12 Commits
9b4a258e48
...
master
Author | SHA1 | Date | |
---|---|---|---|
a0ac08df86 | |||
845586c2db | |||
099abc3a09 | |||
211cce65de | |||
04875e999f | |||
96669e0202 | |||
8c221a3e17 | |||
100f4f68c0 | |||
8237f902c9 | |||
80d0db546d | |||
7812e3a413 | |||
4895a3c03a |
2
external/solanaceae_message3
vendored
2
external/solanaceae_message3
vendored
Submodule external/solanaceae_message3 updated: d2d6cfbf53...53d65a0685
2
external/solanaceae_tox
vendored
2
external/solanaceae_tox
vendored
Submodule external/solanaceae_tox updated: de332421f7...dd596bdad8
@ -28,6 +28,15 @@ std::ostream& operator<<(std::ostream& out, const SHA1Digest& v) {
|
||||
return out;
|
||||
}
|
||||
|
||||
size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
|
||||
if (chunk_index+1 == chunks.size()) {
|
||||
// last chunk
|
||||
return file_size - chunk_index * chunk_size;
|
||||
} else {
|
||||
return chunk_size;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<uint8_t> FT1InfoSHA1::toBuffer(void) const {
|
||||
std::vector<uint8_t> buffer;
|
||||
|
||||
|
@ -46,6 +46,8 @@ struct FT1InfoSHA1 {
|
||||
uint32_t chunk_size {128*1024}; // 128KiB for now
|
||||
std::vector<SHA1Digest> chunks;
|
||||
|
||||
size_t chunkSize(size_t chunk_index) const;
|
||||
|
||||
std::vector<uint8_t> toBuffer(void) const;
|
||||
void fromBuffer(const std::vector<uint8_t>& buffer);
|
||||
};
|
||||
|
105
src/ngcft1.cpp
105
src/ngcft1.cpp
@ -182,7 +182,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
if (timeouts_set.count({idx, id})) {
|
||||
// TODO: can fail
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca.onLoss({idx, id}, false);
|
||||
peer.cca->onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
}
|
||||
@ -201,7 +201,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
|
||||
// clean up cca
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
peer.cca.onLoss({idx, id}, true);
|
||||
peer.cca->onLoss({idx, id}, true);
|
||||
timeouts_set.erase({idx, id});
|
||||
});
|
||||
|
||||
@ -212,7 +212,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
|
||||
// if chunks in flight < window size (2)
|
||||
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
|
||||
int64_t can_packet_size {static_cast<int64_t>(peer.cca.canSend())};
|
||||
int64_t can_packet_size {static_cast<int64_t>(peer.cca->canSend())};
|
||||
//if (can_packet_size) {
|
||||
//std::cerr << "FT: can_packet_size: " << can_packet_size;
|
||||
//}
|
||||
@ -226,7 +226,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
size_t chunk_size = std::min<size_t>({
|
||||
//496u,
|
||||
//996u,
|
||||
peer.cca.MAXIMUM_SEGMENT_DATA_SIZE,
|
||||
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
||||
static_cast<size_t>(can_packet_size),
|
||||
tf.file_size - tf.file_size_current
|
||||
});
|
||||
@ -259,7 +259,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
|
||||
uint16_t seq_id = tf.ssb.add(std::move(new_data));
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
|
||||
peer.cca.onSent({idx, seq_id}, chunk_size);
|
||||
peer.cca->onSent({idx, seq_id}, chunk_size);
|
||||
|
||||
#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1
|
||||
fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id);
|
||||
@ -280,7 +280,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
||||
if (timeouts_set.count({idx, id})) {
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca.onLoss({idx, id}, false);
|
||||
peer.cca->onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
}
|
||||
@ -292,7 +292,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
|
||||
// clean up cca
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
peer.cca.onLoss({idx, id}, true);
|
||||
peer.cca->onLoss({idx, id}, true);
|
||||
timeouts_set.erase({idx, id});
|
||||
});
|
||||
|
||||
@ -308,7 +308,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
}
|
||||
|
||||
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
||||
auto timeouts = peer.cca.getTimeouts();
|
||||
auto timeouts = peer.cca->getTimeouts();
|
||||
std::set<LEDBAT::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
||||
|
||||
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
||||
@ -332,6 +332,8 @@ NGCFT1::NGCFT1(
|
||||
_neep.subscribe(this, NGCEXT_Event::FT1_DATA);
|
||||
_neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK);
|
||||
_neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE);
|
||||
|
||||
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_PEER_EXIT);
|
||||
}
|
||||
|
||||
void NGCFT1::iterate(float time_delta) {
|
||||
@ -423,7 +425,9 @@ bool NGCFT1::NGC_FT1_send_message_public(
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
|
||||
//#if !NDEBUG
|
||||
std::cout << "NGCFT1: FT1_REQUEST fk:" << e.file_kind << " [" << bin2hex(e.file_id) << "]\n";
|
||||
//#endif
|
||||
|
||||
// .... just rethrow??
|
||||
// TODO: dont
|
||||
@ -438,7 +442,9 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
|
||||
//#if !NDEBUG
|
||||
std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n";
|
||||
//#endif
|
||||
|
||||
bool accept = false;
|
||||
dispatch(
|
||||
@ -464,7 +470,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
|
||||
|
||||
auto& peer = groups[e.group_number].peers[e.peer_number];
|
||||
if (peer.recv_transfers[e.transfer_id].has_value()) {
|
||||
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << e.transfer_id << "\n";
|
||||
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << int(e.transfer_id) << "\n";
|
||||
}
|
||||
|
||||
peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{
|
||||
@ -480,7 +486,9 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
||||
//#if !NDEBUG
|
||||
std::cout << "NGCFT1: FT1_INIT_ACK\n";
|
||||
//#endif
|
||||
|
||||
// we now should start sending data
|
||||
|
||||
@ -511,7 +519,9 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
||||
#if !NDEBUG
|
||||
std::cout << "NGCFT1: FT1_DATA\n";
|
||||
#endif
|
||||
|
||||
if (e.data.empty()) {
|
||||
std::cerr << "NGCFT1 error: data of size 0!\n";
|
||||
@ -560,11 +570,26 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
||||
sendPKG_FT1_DATA_ACK(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size());
|
||||
}
|
||||
|
||||
|
||||
if (transfer.file_size_current == transfer.file_size) {
|
||||
// TODO: set all data received, and clean up
|
||||
//transfer.state = Group::Peer::RecvTransfer::State::RECV;
|
||||
dispatch(
|
||||
NGCFT1_Event::recv_done,
|
||||
Events::NGCFT1_recv_done{
|
||||
e.group_number, e.peer_number,
|
||||
e.transfer_id
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
||||
#if !NDEBUG
|
||||
std::cout << "NGCFT1: FT1_DATA_ACK\n";
|
||||
#endif
|
||||
|
||||
if (!groups.count(e.group_number)) {
|
||||
std::cerr << "NGCFT1 warning: data_ack for unknown group\n";
|
||||
@ -598,7 +623,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
||||
seqs.push_back({e.transfer_id, it});
|
||||
transfer.ssb.erase(it);
|
||||
}
|
||||
peer.cca.onAck(seqs);
|
||||
peer.cca->onAck(seqs);
|
||||
|
||||
// delete if all packets acked
|
||||
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
|
||||
@ -632,3 +657,63 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_message& e) {
|
||||
);
|
||||
}
|
||||
|
||||
bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
||||
const auto group_number = tox_event_group_peer_exit_get_group_number(e);
|
||||
const auto peer_number = tox_event_group_peer_exit_get_peer_id(e);
|
||||
|
||||
// peer disconnected, end all transfers
|
||||
|
||||
if (!groups.count(group_number)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& group = groups.at(group_number);
|
||||
|
||||
if (!group.peers.count(peer_number)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& peer = group.peers.at(peer_number);
|
||||
|
||||
for (size_t i = 0; i < peer.send_transfers.size(); i++) {
|
||||
auto& it_opt = peer.send_transfers.at(i);
|
||||
if (!it_opt.has_value()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::cout << "NGCFT1: sending " << int(i) << " canceled bc peer offline\n";
|
||||
dispatch(
|
||||
NGCFT1_Event::send_done,
|
||||
Events::NGCFT1_send_done{
|
||||
group_number, peer_number,
|
||||
static_cast<uint8_t>(i),
|
||||
}
|
||||
);
|
||||
|
||||
it_opt.reset();
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < peer.recv_transfers.size(); i++) {
|
||||
auto& it_opt = peer.recv_transfers.at(i);
|
||||
if (!it_opt.has_value()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::cout << "NGCFT1: receiving " << int(i) << " canceled bc peer offline\n";
|
||||
dispatch(
|
||||
NGCFT1_Event::recv_done,
|
||||
Events::NGCFT1_recv_done{
|
||||
group_number, peer_number,
|
||||
static_cast<uint8_t>(i),
|
||||
}
|
||||
);
|
||||
|
||||
it_opt.reset();
|
||||
}
|
||||
|
||||
// reset cca
|
||||
peer.cca = std::make_unique<LEDBAT>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <memory>
|
||||
|
||||
namespace Events {
|
||||
|
||||
@ -138,7 +139,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
|
||||
struct Group {
|
||||
struct Peer {
|
||||
LEDBAT cca{500-4}; // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||
std::unique_ptr<LEDBAT> cca = std::make_unique<LEDBAT>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||
|
||||
struct RecvTransfer {
|
||||
uint32_t file_kind;
|
||||
@ -244,6 +245,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
bool onEvent(const Events::NGCEXT_ft1_message&) override;
|
||||
|
||||
protected:
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
|
||||
//bool onToxEvent(const Tox_Event_Group_Custom_Packet* e) override;
|
||||
//bool onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) override;
|
||||
};
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <solanaceae/tox_messages/components.hpp>
|
||||
|
||||
#include <solanaceae/message3/file_r_file.hpp>
|
||||
#include <solanaceae/message3/file_rw_file.hpp>
|
||||
|
||||
#include "./ft1_sha1_info.hpp"
|
||||
#include "./hash_utils.hpp"
|
||||
@ -18,6 +19,7 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <variant>
|
||||
#include <filesystem>
|
||||
|
||||
namespace Message::Components {
|
||||
|
||||
@ -46,13 +48,17 @@ namespace Components {
|
||||
std::vector<bool> have_chunk;
|
||||
bool have_all {false};
|
||||
size_t have_count {0};
|
||||
entt::dense_map<SHA1Digest, size_t> chunk_hash_to_index;
|
||||
entt::dense_map<SHA1Digest, std::vector<size_t>> chunk_hash_to_index;
|
||||
|
||||
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
|
||||
size_t chunkSize(size_t chunk_index) const;
|
||||
std::vector<size_t> chunkIndices(const SHA1Digest& hash) const;
|
||||
bool haveChunk(const SHA1Digest& hash) const;
|
||||
};
|
||||
|
||||
struct FT1ChunkSHA1Requested {
|
||||
// requested chunks with a timer since last request
|
||||
entt::dense_map<size_t, float> chunks;
|
||||
};
|
||||
|
||||
struct SuspectedParticipants {
|
||||
entt::dense_set<Contact3> participants;
|
||||
};
|
||||
@ -61,14 +67,22 @@ namespace Components {
|
||||
float timer {0.f};
|
||||
};
|
||||
|
||||
struct ReadHeadHint {
|
||||
// points to the first byte we want
|
||||
// this is just a hint, that can be set from outside
|
||||
// to guide the sequential "piece picker" strategy
|
||||
// the strategy *should* set this to the first byte we dont yet have
|
||||
uint64_t offset_into_file {0u};
|
||||
};
|
||||
|
||||
} // Components
|
||||
|
||||
std::optional<size_t> Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const {
|
||||
std::vector<size_t> Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
|
||||
const auto it = chunk_hash_to_index.find(hash);
|
||||
if (it != chunk_hash_to_index.cend()) {
|
||||
return it->second;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,8 +91,9 @@ bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (auto i_opt = chunkIndex(hash); i_opt.has_value()) {
|
||||
return have_chunk[i_opt.value()];
|
||||
if (auto i_vec = chunkIndices(hash); !i_vec.empty()) {
|
||||
// TODO: should i test all?
|
||||
return have_chunk[i_vec.front()];
|
||||
}
|
||||
|
||||
// not part of this file
|
||||
@ -126,6 +141,14 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
|
||||
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
|
||||
msg.emplace_or_replace<Message::Components::Transfer::BytesSent>(ce.get<Message::Components::Transfer::BytesSent>());
|
||||
}
|
||||
if (ce.all_of<Message::Components::Transfer::BytesReceived>()) {
|
||||
msg.emplace_or_replace<Message::Components::Transfer::BytesReceived>(ce.get<Message::Components::Transfer::BytesReceived>());
|
||||
}
|
||||
if (ce.all_of<Message::Components::Transfer::TagPaused>()) {
|
||||
msg.emplace_or_replace<Message::Components::Transfer::TagPaused>();
|
||||
} else {
|
||||
msg.remove<Message::Components::Transfer::TagPaused>();
|
||||
}
|
||||
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
|
||||
msg.emplace_or_replace<Message::Components::Transfer::TagHaveAll>();
|
||||
}
|
||||
@ -134,6 +157,64 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) {
|
||||
// get a list of peers we can request this file from
|
||||
// TODO: randomly request from non SuspectedParticipants
|
||||
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
|
||||
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
|
||||
// TODO: sort by con state?
|
||||
// prio to direct?
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
|
||||
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
|
||||
}
|
||||
}
|
||||
|
||||
// 1 in 20 chance to ask random peer instead
|
||||
// TODO: config + tweak
|
||||
// TODO: save group in content to avoid the tox_peers list build
|
||||
if (tox_peers.empty() || (_rng()%20) == 0) {
|
||||
// meh
|
||||
// HACK: determain group based on last tox_peers
|
||||
if (!tox_peers.empty()) {
|
||||
const uint32_t group_number = tox_peers.back().first;
|
||||
auto gch = _tcm.getContactGroup(group_number);
|
||||
assert(static_cast<bool>(gch));
|
||||
|
||||
std::vector<uint32_t> un_tox_peers;
|
||||
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
|
||||
un_tox_peers.push_back(tgpe.peer_number);
|
||||
}
|
||||
}
|
||||
if (un_tox_peers.empty()) {
|
||||
// no one online, we are out of luck
|
||||
} else {
|
||||
const size_t sample_i = _rng()%un_tox_peers.size();
|
||||
const auto peer_number = un_tox_peers.at(sample_i);
|
||||
|
||||
return std::make_pair(group_number, peer_number);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const size_t sample_i = _rng()%tox_peers.size();
|
||||
const auto [group_number, peer_number] = tox_peers.at(sample_i);
|
||||
|
||||
return std::make_pair(group_number, peer_number);
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
SHA1_NGCFT1::SHA1_NGCFT1(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModel& rmm,
|
||||
@ -145,6 +226,9 @@ SHA1_NGCFT1::SHA1_NGCFT1(
|
||||
_nft(nft),
|
||||
_tcm(tcm)
|
||||
{
|
||||
// TODO: also create and destroy
|
||||
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
|
||||
|
||||
_nft.subscribe(this, NGCFT1_Event::recv_request);
|
||||
_nft.subscribe(this, NGCFT1_Event::recv_init);
|
||||
_nft.subscribe(this, NGCFT1_Event::recv_data);
|
||||
@ -169,8 +253,7 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
|
||||
// if we have not heard for 10sec, timeout
|
||||
if (it->second.time_since_activity >= 10.f) {
|
||||
//std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
|
||||
std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << "." << int(it->first) << "\n";
|
||||
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
|
||||
it = peer_it->second.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
@ -184,18 +267,30 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
peer_it++;
|
||||
}
|
||||
}
|
||||
//for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) {
|
||||
//float& time_since_remove_activity = std::get<float>(*it);
|
||||
//time_since_remove_activity += delta;
|
||||
|
||||
//// if we have not heard for 10sec, timeout
|
||||
//if (time_since_remove_activity >= 10.f) {
|
||||
//std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
|
||||
//it = _transfers_sending_chunk.erase(it);
|
||||
//} else {
|
||||
//it++;
|
||||
//}
|
||||
//}
|
||||
// receiving transfers
|
||||
for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) {
|
||||
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
|
||||
it->second.time_since_activity += delta;
|
||||
|
||||
// if we have not heard for 10sec, timeout
|
||||
if (it->second.time_since_activity >= 10.f) {
|
||||
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
|
||||
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
|
||||
//it->second.v
|
||||
it = peer_it->second.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
|
||||
if (peer_it->second.empty()) {
|
||||
// cleanup unused peers too agressive?
|
||||
peer_it = _receiving_transfers.erase(peer_it);
|
||||
} else {
|
||||
peer_it++;
|
||||
}
|
||||
}
|
||||
|
||||
// queued requests
|
||||
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
|
||||
@ -225,6 +320,20 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
_contentr.remove<Components::ReRequestInfoTimer>(e);
|
||||
}
|
||||
}
|
||||
{ // requested chunk timers
|
||||
_contentr.view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
||||
it->second += delta;
|
||||
|
||||
// 20sec, TODO: config
|
||||
if (it->second >= 20.f) {
|
||||
it = ftchunk_requested.chunks.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// if we have not reached the total cap for transfers
|
||||
@ -244,23 +353,41 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
if (!_queue_requested_chunk.empty()) { // then check for chunk requests
|
||||
const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front();
|
||||
|
||||
auto chunk_idx_opt = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash);
|
||||
if (chunk_idx_opt.has_value()) {
|
||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||
auto chunk_idx_vec = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndices(chunk_hash);
|
||||
if (!chunk_idx_vec.empty()) {
|
||||
|
||||
uint8_t transfer_id {0};
|
||||
if (_nft.NGC_FT1_send_init_private(
|
||||
group_number, peer_number,
|
||||
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
||||
chunk_hash.data.data(), chunk_hash.size(),
|
||||
chunkSize(info, chunk_idx_opt.value()),
|
||||
&transfer_id
|
||||
)) {
|
||||
_sending_transfers
|
||||
[combineIds(group_number, peer_number)]
|
||||
[transfer_id] // TODO: also save index?
|
||||
.v = SendingTransfer::Chunk{ce, chunk_idx_opt.value() * info.chunk_size};
|
||||
// check if already sending
|
||||
bool already_sending_to_this_peer = false;
|
||||
if (_sending_transfers.count(combineIds(group_number, peer_number))) {
|
||||
for (const auto& [_2, t] : _sending_transfers.at(combineIds(group_number, peer_number))) {
|
||||
if (std::holds_alternative<SendingTransfer::Chunk>(t.v)) {
|
||||
const auto& v = std::get<SendingTransfer::Chunk>(t.v);
|
||||
if (v.content == ce && v.chunk_index == chunk_idx_vec.front()) {
|
||||
// already sending
|
||||
already_sending_to_this_peer = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!already_sending_to_this_peer) {
|
||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||
|
||||
uint8_t transfer_id {0};
|
||||
if (_nft.NGC_FT1_send_init_private(
|
||||
group_number, peer_number,
|
||||
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
||||
chunk_hash.data.data(), chunk_hash.size(),
|
||||
chunkSize(info, chunk_idx_vec.front()),
|
||||
&transfer_id
|
||||
)) {
|
||||
_sending_transfers
|
||||
[combineIds(group_number, peer_number)]
|
||||
[transfer_id] // TODO: also save index?
|
||||
.v = SendingTransfer::Chunk{ce, chunk_idx_vec.front()};
|
||||
}
|
||||
} // else just remove from queue
|
||||
}
|
||||
// remove from queue regardless
|
||||
_queue_requested_chunk.pop_front();
|
||||
@ -279,68 +406,9 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
|
||||
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
|
||||
|
||||
// get a list of peers we can request this file from
|
||||
// TODO: randomly request from non SuspectedParticipants
|
||||
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
|
||||
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
|
||||
// TODO: sort by con state?
|
||||
// prio to direct?
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
|
||||
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
|
||||
}
|
||||
}
|
||||
|
||||
// 1 in 20 chance to ask random peer instead
|
||||
// TODO: config + tweak
|
||||
// TODO: save group in content to avoid the tox_peers list build
|
||||
if (tox_peers.empty() || (_rng()%20) == 0) {
|
||||
// meh
|
||||
// HACK: determain group based on last tox_peers
|
||||
if (!tox_peers.empty()) {
|
||||
const uint32_t group_number = tox_peers.back().first;
|
||||
auto gch = _tcm.getContactGroup(group_number);
|
||||
assert(static_cast<bool>(gch));
|
||||
|
||||
std::vector<uint32_t> un_tox_peers;
|
||||
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
|
||||
un_tox_peers.push_back(tgpe.peer_number);
|
||||
}
|
||||
}
|
||||
if (un_tox_peers.empty()) {
|
||||
// no one online, we are out of luck
|
||||
} else {
|
||||
const size_t sample_i = _rng()%un_tox_peers.size();
|
||||
const auto peer_number = un_tox_peers.at(sample_i);
|
||||
|
||||
//const auto& info = msg.get<Components::FT1InfoSHA1>();
|
||||
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
|
||||
|
||||
_nft.NGC_FT1_send_request_private(
|
||||
group_number, peer_number,
|
||||
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO),
|
||||
info_hash.data(), info_hash.size()
|
||||
);
|
||||
ce.emplace<Components::ReRequestInfoTimer>(0.f);
|
||||
|
||||
_queue_content_want_info.pop_front();
|
||||
|
||||
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << " (rng)\n";
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const size_t sample_i = _rng()%tox_peers.size();
|
||||
const auto [group_number, peer_number] = tox_peers.at(sample_i);
|
||||
auto selected_peer_opt = selectPeerForRequest(ce);
|
||||
if (selected_peer_opt.has_value()) {
|
||||
const auto [group_number, peer_number] = selected_peer_opt.value();
|
||||
|
||||
//const auto& info = msg.get<Components::FT1InfoSHA1>();
|
||||
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
|
||||
@ -357,10 +425,166 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
|
||||
}
|
||||
} else if (!_queue_content_want_chunk.empty()) {
|
||||
const auto ce = _queue_content_want_chunk.front();
|
||||
|
||||
auto& requested_chunks = ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
|
||||
if (requested_chunks.size() < _max_pending_requests) {
|
||||
|
||||
// select chunk/make sure we still need one
|
||||
auto selected_peer_opt = selectPeerForRequest(ce);
|
||||
if (selected_peer_opt.has_value()) {
|
||||
const auto [group_number, peer_number] = selected_peer_opt.value();
|
||||
//std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n";
|
||||
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||
|
||||
// naive, choose first chunk we dont have (double requests!!)
|
||||
for (size_t chunk_idx = 0; chunk_idx < cc.have_chunk.size(); chunk_idx++) {
|
||||
if (cc.have_chunk[chunk_idx]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// check by hash
|
||||
if (cc.haveChunk(info.chunks.at(chunk_idx))) {
|
||||
// TODO: fix this, a completed chunk should fill all the indecies it occupies
|
||||
cc.have_chunk[chunk_idx] = true;
|
||||
cc.have_count += 1;
|
||||
if (cc.have_count == info.chunks.size()) {
|
||||
cc.have_all = true;
|
||||
cc.have_chunk.clear();
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (requested_chunks.count(chunk_idx)) {
|
||||
// already requested
|
||||
continue;
|
||||
}
|
||||
|
||||
// request chunk_idx
|
||||
_nft.NGC_FT1_send_request_private(
|
||||
group_number, peer_number,
|
||||
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
||||
info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size()
|
||||
);
|
||||
requested_chunks[chunk_idx] = 0.f;
|
||||
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n";
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// ...
|
||||
|
||||
// TODO: properly determine
|
||||
if (!cc.have_all) {
|
||||
_queue_content_want_chunk.push_back(ce);
|
||||
}
|
||||
_queue_content_want_chunk.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
||||
// see tox_transfer_manager.cpp for reference
|
||||
if (!e.e.all_of<Message::Components::Transfer::ActionAccept, Message::Components::Content>()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
|
||||
auto ce = e.e.get<Message::Components::Content>();
|
||||
|
||||
//if (!ce.all_of<Components::FT1InfoSHA1, Components::FT1ChunkSHA1Cache>()) {
|
||||
if (!ce.all_of<Components::FT1InfoSHA1>()) {
|
||||
// not ready to load yet, skip
|
||||
return false;
|
||||
}
|
||||
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
|
||||
assert(!ce.all_of<Message::Components::Transfer::File>());
|
||||
|
||||
// first, open file for write(+readback)
|
||||
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path};
|
||||
// TODO: replace with filesystem or something
|
||||
// TODO: ensure dir exists
|
||||
if (full_file_path.back() != '/') {
|
||||
full_file_path += "/";
|
||||
}
|
||||
|
||||
std::filesystem::create_directories(full_file_path);
|
||||
|
||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||
full_file_path += info.file_name;
|
||||
|
||||
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
|
||||
|
||||
std::unique_ptr<FileRWFile> file_impl;
|
||||
const bool file_exists = std::filesystem::exists(full_file_path);
|
||||
|
||||
{
|
||||
const bool truncate = !file_exists;
|
||||
file_impl = std::make_unique<FileRWFile>(full_file_path, info.file_size, truncate);
|
||||
}
|
||||
|
||||
if (!file_impl->isGood()) {
|
||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
|
||||
//e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
|
||||
return false;
|
||||
}
|
||||
|
||||
{ // next, create chuck cache and check for existing data
|
||||
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
|
||||
cc.have_all = false;
|
||||
cc.have_count = 0;
|
||||
|
||||
cc.chunk_hash_to_index.clear(); // if copy pasta
|
||||
|
||||
if (file_exists) {
|
||||
// iterate existing file
|
||||
for (size_t i = 0; i < info.chunks.size(); i++) {
|
||||
auto existing_data = file_impl->read(i*info.chunk_size, info.chunkSize(i));
|
||||
// TODO: avoid copy
|
||||
cc.have_chunk.push_back(
|
||||
SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())} == info.chunks.at(i)
|
||||
);
|
||||
if (cc.have_chunk.back()) {
|
||||
cc.have_count += 1;
|
||||
}
|
||||
|
||||
_chunks[info.chunks[i]] = ce;
|
||||
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
|
||||
}
|
||||
|
||||
if (cc.have_count == info.chunks.size()) {
|
||||
cc.have_all = true;
|
||||
}
|
||||
} else {
|
||||
for (size_t i = 0; i < info.chunks.size(); i++) {
|
||||
cc.have_chunk.push_back(false);
|
||||
_chunks[info.chunks[i]] = ce;
|
||||
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
if (!cc.have_all) {
|
||||
// now, enque
|
||||
_queue_content_want_chunk.push_back(ce);
|
||||
}
|
||||
}
|
||||
|
||||
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
|
||||
|
||||
ce.remove<Message::Components::Transfer::TagPaused>();
|
||||
|
||||
// should?
|
||||
e.e.remove<Message::Components::Transfer::ActionAccept>();
|
||||
|
||||
updateMessages(ce);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
// only interested in sha1
|
||||
if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) {
|
||||
@ -423,7 +647,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
|
||||
}
|
||||
|
||||
assert(msg.all_of<Components::FT1ChunkSHA1Cache>());
|
||||
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
|
||||
|
||||
if (!ce.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
|
||||
// we dont have the chunk
|
||||
@ -445,11 +669,121 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
// TODO: make sure we requested this?
|
||||
|
||||
if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) {
|
||||
SHA1Digest sha1_info_hash {e.file_id, e.file_id_size};
|
||||
if (!_info_to_content.count(sha1_info_hash)) {
|
||||
// no idea about this content
|
||||
return false;
|
||||
}
|
||||
|
||||
auto ce = _info_to_content.at(sha1_info_hash);
|
||||
|
||||
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data, Components::FT1ChunkSHA1Cache>()) {
|
||||
// we already have the info (should)
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: check if e.file_size too large / ask for permission
|
||||
if (e.file_size > 100*1024*1024) {
|
||||
// a info size of 100MiB is ~640GiB for a 128KiB chunk size (default)
|
||||
return false;
|
||||
}
|
||||
|
||||
_receiving_transfers
|
||||
[combineIds(e.group_number, e.peer_number)]
|
||||
[e.transfer_id]
|
||||
.v = ReceivingTransfer::Info{ce, std::vector<uint8_t>(e.file_size)};
|
||||
|
||||
e.accept = true;
|
||||
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
|
||||
SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size};
|
||||
|
||||
if (!_chunks.count(sha1_chunk_hash)) {
|
||||
// no idea about this content
|
||||
return false;
|
||||
}
|
||||
|
||||
auto ce = _chunks.at(sha1_chunk_hash);
|
||||
|
||||
// CHECK IF TRANSFER IN PROGESS!!
|
||||
|
||||
{ // they have the content (probably, might be fake, should move this to done)
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
|
||||
}
|
||||
|
||||
assert(ce.all_of<Components::FT1InfoSHA1>());
|
||||
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
|
||||
|
||||
const auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
||||
if (cc.haveChunk(sha1_chunk_hash)) {
|
||||
std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n";
|
||||
// we have the chunk
|
||||
return false;
|
||||
}
|
||||
// TODO: cache position
|
||||
|
||||
// calc offset_into_file
|
||||
auto idx_vec = cc.chunkIndices(sha1_chunk_hash);
|
||||
assert(!idx_vec.empty());
|
||||
|
||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||
|
||||
// TODO: check e.file_size
|
||||
assert(e.file_size == info.chunkSize(idx_vec.front()));
|
||||
|
||||
_receiving_transfers
|
||||
[combineIds(e.group_number, e.peer_number)]
|
||||
[e.transfer_id]
|
||||
.v = ReceivingTransfer::Chunk{ce, idx_vec};
|
||||
|
||||
e.accept = true;
|
||||
|
||||
std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n";
|
||||
} else {
|
||||
assert(false && "unhandled case");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
return false;
|
||||
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
|
||||
if (!peer_transfers.count(e.transfer_id)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& tv = peer_transfers[e.transfer_id].v;
|
||||
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
|
||||
auto& info_data = std::get<ReceivingTransfer::Info>(tv).info_data;
|
||||
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
|
||||
info_data[i+e.data_offset] = e.data[i];
|
||||
}
|
||||
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
||||
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
|
||||
|
||||
assert(ce.all_of<Message::Components::Transfer::File>());
|
||||
auto* file = ce.get<Message::Components::Transfer::File>().get();
|
||||
assert(file != nullptr);
|
||||
|
||||
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
|
||||
|
||||
// TODO: avoid temporary copy
|
||||
// TODO: check return
|
||||
file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size});
|
||||
}
|
||||
} else {
|
||||
assert(false && "unhandled case");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
@ -476,8 +810,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
}
|
||||
} else if (std::holds_alternative<SendingTransfer::Chunk>(transfer.v)) {
|
||||
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
|
||||
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
|
||||
// TODO: should we really use file?
|
||||
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(chunk_transfer.offset_into_file + e.data_offset, e.data_size);
|
||||
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read((chunk_transfer.chunk_index * info.chunk_size) + e.data_offset, e.data_size);
|
||||
|
||||
// TODO: optimize
|
||||
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
|
||||
@ -502,7 +837,119 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
return false;
|
||||
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
|
||||
if (!peer_transfers.count(e.transfer_id)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const auto& tv = peer_transfers[e.transfer_id].v;
|
||||
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
|
||||
auto& info = std::get<ReceivingTransfer::Info>(tv);
|
||||
auto ce = info.content;
|
||||
|
||||
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data>()) {
|
||||
// we already have the info, discard
|
||||
peer_transfers.erase(e.transfer_id);
|
||||
return true;
|
||||
}
|
||||
|
||||
// check if data matches hash
|
||||
auto hash = hash_sha1(info.info_data.data(), info.info_data.size());
|
||||
|
||||
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
|
||||
if (ce.get<Components::FT1InfoSHA1Hash>().hash != hash) {
|
||||
std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n";
|
||||
// requeue info request
|
||||
peer_transfers.erase(e.transfer_id);
|
||||
return true;
|
||||
}
|
||||
|
||||
const auto& info_data = ce.emplace_or_replace<Components::FT1InfoSHA1Data>(std::move(info.info_data)).data;
|
||||
auto& ft_info = ce.emplace_or_replace<Components::FT1InfoSHA1>();
|
||||
ft_info.fromBuffer(info_data);
|
||||
|
||||
{ // file info
|
||||
// TODO: not overwrite fi? since same?
|
||||
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
|
||||
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
|
||||
file_info.total_size = ft_info.file_size;
|
||||
}
|
||||
|
||||
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
|
||||
|
||||
ce.remove<Components::ReRequestInfoTimer>();
|
||||
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
|
||||
_queue_content_want_info.erase(it);
|
||||
}
|
||||
|
||||
ce.emplace_or_replace<Message::Components::Transfer::TagPaused>();
|
||||
|
||||
updateMessages(ce);
|
||||
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
||||
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
|
||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
||||
|
||||
// HACK: only check first chunk (they *should* all be the same)
|
||||
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.front();
|
||||
const auto offset_into_file = chunk_index * info.chunk_size;
|
||||
|
||||
assert(chunk_index < info.chunks.size());
|
||||
const auto chunk_size = info.chunkSize(chunk_index);
|
||||
assert(offset_into_file+chunk_size <= info.file_size);
|
||||
|
||||
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(offset_into_file, chunk_size);
|
||||
|
||||
// check hash of chunk
|
||||
auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size());
|
||||
if (info.chunks.at(chunk_index) == got_hash) {
|
||||
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
||||
|
||||
// remove from requested
|
||||
// TODO: remove at init and track running transfers differently
|
||||
for (const auto it : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||
ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
|
||||
}
|
||||
|
||||
if (!cc.have_all) {
|
||||
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||
if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) {
|
||||
cc.have_chunk.at(inner_chunk_index) = true;
|
||||
cc.have_count += 1;
|
||||
if (cc.have_count == info.chunks.size()) {
|
||||
// debug check
|
||||
for ([[maybe_unused]] const bool it : cc.have_chunk) {
|
||||
assert(it);
|
||||
}
|
||||
|
||||
cc.have_all = true;
|
||||
cc.have_chunk.clear(); // not wasting memory
|
||||
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
|
||||
}
|
||||
|
||||
// good chunk
|
||||
// TODO: have wasted + metadata
|
||||
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
|
||||
}
|
||||
} else {
|
||||
// bad chunk
|
||||
// TODO: requeue?
|
||||
}
|
||||
|
||||
updateMessages(ce); // mostly for received bytes
|
||||
}
|
||||
|
||||
peer_transfers.erase(e.transfer_id);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
|
||||
@ -597,6 +1044,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
//ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
|
||||
}
|
||||
ce.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
|
||||
reg_ptr->emplace<Message::Components::Content>(new_msg_e, ce);
|
||||
|
||||
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
|
||||
|
||||
@ -713,10 +1161,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
cc.have_count = sha1_info.chunks.size(); // need?
|
||||
|
||||
_info_to_content[sha1_info_hash] = ce;
|
||||
for (size_t i = sha1_info.chunks.size(); i > 0; i--) {
|
||||
_chunks[sha1_info.chunks[i-1]] = ce;
|
||||
// chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index
|
||||
cc.chunk_hash_to_index[sha1_info.chunks[i-1]] = i-1;
|
||||
cc.chunk_hash_to_index.clear(); // for cpy pst
|
||||
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
|
||||
_chunks[sha1_info.chunks[i]] = ce;
|
||||
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
@ -740,7 +1188,24 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
ce.emplace<Message::Components::Transfer::BytesSent>(0u);
|
||||
}
|
||||
|
||||
ce.remove<Message::Components::Transfer::TagPaused>();
|
||||
|
||||
// we dont want the info anymore
|
||||
ce.remove<Components::ReRequestInfoTimer>();
|
||||
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
|
||||
_queue_content_want_info.erase(it);
|
||||
}
|
||||
|
||||
// TODO: we dont want chunks anymore
|
||||
|
||||
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
|
||||
auto it = _queue_content_want_chunk.begin();
|
||||
while (
|
||||
it != _queue_content_want_chunk.end() &&
|
||||
(it = std::find(it, _queue_content_want_chunk.end(), ce)) != _queue_content_want_chunk.end()
|
||||
) {
|
||||
it = _queue_content_want_chunk.erase(it);
|
||||
}
|
||||
} else {
|
||||
ce = {_contentr, _contentr.create()};
|
||||
_info_to_content[sha1_info_hash] = ce;
|
||||
@ -756,9 +1221,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
cc.have_count = sha1_info.chunks.size(); // need?
|
||||
|
||||
_info_to_content[sha1_info_hash] = ce;
|
||||
cc.chunk_hash_to_index.clear(); // for cpy pst
|
||||
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
|
||||
_chunks[sha1_info.chunks[i]] = ce;
|
||||
cc.chunk_hash_to_index[sha1_info.chunks[i]] = i;
|
||||
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
@ -782,7 +1248,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
const auto msg_e = reg_ptr->create();
|
||||
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
||||
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
||||
@ -798,6 +1263,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
// file id would be sha1_info hash or something
|
||||
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
|
||||
|
||||
// remove? done in updateMessages() anyway
|
||||
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
|
||||
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(msg_e, ce.get<Message::Components::Transfer::FileInfo>());
|
||||
}
|
||||
@ -811,17 +1277,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
// TODO: determine if this is true
|
||||
//reg_ptr->emplace<Message::Components::Transfer::TagPaused>(e);
|
||||
|
||||
#if 0
|
||||
const auto friend_number = _cr.get<Contact::Components::ToxFriendEphemeral>(c).friend_number;
|
||||
const auto&& [transfer_id, err] = _t.toxFileSend(friend_number, file_kind, file_impl->_file_size, file_id, file_name);
|
||||
if (err == TOX_ERR_FILE_SEND_OK) {
|
||||
reg_ptr->emplace<Message::Components::Transfer::ToxTransferFriend>(e, friend_number, transfer_id.value());
|
||||
// TODO: add tag signifying init sent status?
|
||||
|
||||
toxFriendLookupAdd({*reg_ptr, e});
|
||||
} // else queue?
|
||||
#endif
|
||||
|
||||
if (_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
|
||||
const uint32_t group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
|
||||
uint32_t message_id = 0;
|
||||
|
@ -37,6 +37,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
|
||||
// sha1 chunk index
|
||||
// TODO: optimize lookup
|
||||
// TODO: multiple contents. hashes might be unique, but data is not
|
||||
entt::dense_map<SHA1Digest, ContentHandle> _chunks;
|
||||
|
||||
// group_number, peer_number, content, chunk_hash, timer
|
||||
@ -53,7 +54,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
|
||||
struct Chunk {
|
||||
ContentHandle content;
|
||||
uint64_t offset_into_file;
|
||||
size_t chunk_index; // <.< remove offset_into_file
|
||||
//uint64_t offset_into_file;
|
||||
// or data?
|
||||
// if memmapped, this would be just a pointer
|
||||
};
|
||||
@ -75,7 +77,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
|
||||
struct Chunk {
|
||||
ContentHandle content;
|
||||
uint64_t offset_into_file;
|
||||
std::vector<size_t> chunk_indices;
|
||||
// or data?
|
||||
// if memmapped, this would be just a pointer
|
||||
};
|
||||
@ -95,11 +97,15 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
|
||||
void updateMessages(ContentHandle ce);
|
||||
|
||||
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ContentHandle ce);
|
||||
|
||||
public: // TODO: config
|
||||
bool _udp_only {false};
|
||||
|
||||
size_t _max_concurrent_in {8};
|
||||
size_t _max_concurrent_out {4};
|
||||
size_t _max_concurrent_in {4};
|
||||
size_t _max_concurrent_out {6};
|
||||
// TODO: probably also includes running transfers rn (meh)
|
||||
size_t _max_pending_requests {16}; // per content
|
||||
|
||||
public:
|
||||
SHA1_NGCFT1(
|
||||
@ -111,6 +117,9 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
|
||||
void iterate(float delta);
|
||||
|
||||
protected: // rmm events (actions)
|
||||
bool onEvent(const Message::Events::MessageUpdated&) override;
|
||||
|
||||
protected: // events
|
||||
bool onEvent(const Events::NGCFT1_recv_request&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_init&) override;
|
||||
|
Reference in New Issue
Block a user