Compare commits

..

12 Commits

8 changed files with 698 additions and 136 deletions

View File

@ -28,6 +28,15 @@ std::ostream& operator<<(std::ostream& out, const SHA1Digest& v) {
return out;
}
size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
if (chunk_index+1 == chunks.size()) {
// last chunk
return file_size - chunk_index * chunk_size;
} else {
return chunk_size;
}
}
std::vector<uint8_t> FT1InfoSHA1::toBuffer(void) const {
std::vector<uint8_t> buffer;

View File

@ -46,6 +46,8 @@ struct FT1InfoSHA1 {
uint32_t chunk_size {128*1024}; // 128KiB for now
std::vector<SHA1Digest> chunks;
size_t chunkSize(size_t chunk_index) const;
std::vector<uint8_t> toBuffer(void) const;
void fromBuffer(const std::vector<uint8_t>& buffer);
};

View File

@ -182,7 +182,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
if (timeouts_set.count({idx, id})) {
// TODO: can fail
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca.onLoss({idx, id}, false);
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
}
@ -201,7 +201,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca.onLoss({idx, id}, true);
peer.cca->onLoss({idx, id}, true);
timeouts_set.erase({idx, id});
});
@ -212,7 +212,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
// if chunks in flight < window size (2)
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
int64_t can_packet_size {static_cast<int64_t>(peer.cca.canSend())};
int64_t can_packet_size {static_cast<int64_t>(peer.cca->canSend())};
//if (can_packet_size) {
//std::cerr << "FT: can_packet_size: " << can_packet_size;
//}
@ -226,7 +226,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
size_t chunk_size = std::min<size_t>({
//496u,
//996u,
peer.cca.MAXIMUM_SEGMENT_DATA_SIZE,
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
static_cast<size_t>(can_packet_size),
tf.file_size - tf.file_size_current
});
@ -259,7 +259,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
uint16_t seq_id = tf.ssb.add(std::move(new_data));
sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
peer.cca.onSent({idx, seq_id}, chunk_size);
peer.cca->onSent({idx, seq_id}, chunk_size);
#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1
fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id);
@ -280,7 +280,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
if (timeouts_set.count({idx, id})) {
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca.onLoss({idx, id}, false);
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
}
@ -292,7 +292,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca.onLoss({idx, id}, true);
peer.cca->onLoss({idx, id}, true);
timeouts_set.erase({idx, id});
});
@ -308,7 +308,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
}
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
auto timeouts = peer.cca.getTimeouts();
auto timeouts = peer.cca->getTimeouts();
std::set<LEDBAT::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
@ -332,6 +332,8 @@ NGCFT1::NGCFT1(
_neep.subscribe(this, NGCEXT_Event::FT1_DATA);
_neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK);
_neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE);
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_PEER_EXIT);
}
void NGCFT1::iterate(float time_delta) {
@ -423,7 +425,9 @@ bool NGCFT1::NGC_FT1_send_message_public(
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
//#if !NDEBUG
std::cout << "NGCFT1: FT1_REQUEST fk:" << e.file_kind << " [" << bin2hex(e.file_id) << "]\n";
//#endif
// .... just rethrow??
// TODO: dont
@ -438,7 +442,9 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
//#if !NDEBUG
std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n";
//#endif
bool accept = false;
dispatch(
@ -464,7 +470,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
auto& peer = groups[e.group_number].peers[e.peer_number];
if (peer.recv_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << e.transfer_id << "\n";
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << int(e.transfer_id) << "\n";
}
peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{
@ -480,7 +486,9 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
//#if !NDEBUG
std::cout << "NGCFT1: FT1_INIT_ACK\n";
//#endif
// we now should start sending data
@ -511,7 +519,9 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
#if !NDEBUG
std::cout << "NGCFT1: FT1_DATA\n";
#endif
if (e.data.empty()) {
std::cerr << "NGCFT1 error: data of size 0!\n";
@ -560,11 +570,26 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
sendPKG_FT1_DATA_ACK(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size());
}
if (transfer.file_size_current == transfer.file_size) {
// TODO: set all data received, and clean up
//transfer.state = Group::Peer::RecvTransfer::State::RECV;
dispatch(
NGCFT1_Event::recv_done,
Events::NGCFT1_recv_done{
e.group_number, e.peer_number,
e.transfer_id
}
);
}
return true;
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
#if !NDEBUG
std::cout << "NGCFT1: FT1_DATA_ACK\n";
#endif
if (!groups.count(e.group_number)) {
std::cerr << "NGCFT1 warning: data_ack for unknown group\n";
@ -598,7 +623,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
seqs.push_back({e.transfer_id, it});
transfer.ssb.erase(it);
}
peer.cca.onAck(seqs);
peer.cca->onAck(seqs);
// delete if all packets acked
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
@ -632,3 +657,63 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_message& e) {
);
}
bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
const auto group_number = tox_event_group_peer_exit_get_group_number(e);
const auto peer_number = tox_event_group_peer_exit_get_peer_id(e);
// peer disconnected, end all transfers
if (!groups.count(group_number)) {
return false;
}
auto& group = groups.at(group_number);
if (!group.peers.count(peer_number)) {
return false;
}
auto& peer = group.peers.at(peer_number);
for (size_t i = 0; i < peer.send_transfers.size(); i++) {
auto& it_opt = peer.send_transfers.at(i);
if (!it_opt.has_value()) {
continue;
}
std::cout << "NGCFT1: sending " << int(i) << " canceled bc peer offline\n";
dispatch(
NGCFT1_Event::send_done,
Events::NGCFT1_send_done{
group_number, peer_number,
static_cast<uint8_t>(i),
}
);
it_opt.reset();
}
for (size_t i = 0; i < peer.recv_transfers.size(); i++) {
auto& it_opt = peer.recv_transfers.at(i);
if (!it_opt.has_value()) {
continue;
}
std::cout << "NGCFT1: receiving " << int(i) << " canceled bc peer offline\n";
dispatch(
NGCFT1_Event::recv_done,
Events::NGCFT1_recv_done{
group_number, peer_number,
static_cast<uint8_t>(i),
}
);
it_opt.reset();
}
// reset cca
peer.cca = std::make_unique<LEDBAT>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
return false;
}

View File

@ -15,6 +15,7 @@
#include <map>
#include <set>
#include <memory>
namespace Events {
@ -138,7 +139,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
struct Group {
struct Peer {
LEDBAT cca{500-4}; // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
std::unique_ptr<LEDBAT> cca = std::make_unique<LEDBAT>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
struct RecvTransfer {
uint32_t file_kind;
@ -244,6 +245,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
bool onEvent(const Events::NGCEXT_ft1_message&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
//bool onToxEvent(const Tox_Event_Group_Custom_Packet* e) override;
//bool onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) override;
};

View File

@ -8,6 +8,7 @@
#include <solanaceae/tox_messages/components.hpp>
#include <solanaceae/message3/file_r_file.hpp>
#include <solanaceae/message3/file_rw_file.hpp>
#include "./ft1_sha1_info.hpp"
#include "./hash_utils.hpp"
@ -18,6 +19,7 @@
#include <iostream>
#include <variant>
#include <filesystem>
namespace Message::Components {
@ -46,13 +48,17 @@ namespace Components {
std::vector<bool> have_chunk;
bool have_all {false};
size_t have_count {0};
entt::dense_map<SHA1Digest, size_t> chunk_hash_to_index;
entt::dense_map<SHA1Digest, std::vector<size_t>> chunk_hash_to_index;
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
size_t chunkSize(size_t chunk_index) const;
std::vector<size_t> chunkIndices(const SHA1Digest& hash) const;
bool haveChunk(const SHA1Digest& hash) const;
};
struct FT1ChunkSHA1Requested {
// requested chunks with a timer since last request
entt::dense_map<size_t, float> chunks;
};
struct SuspectedParticipants {
entt::dense_set<Contact3> participants;
};
@ -61,14 +67,22 @@ namespace Components {
float timer {0.f};
};
struct ReadHeadHint {
// points to the first byte we want
// this is just a hint, that can be set from outside
// to guide the sequential "piece picker" strategy
// the strategy *should* set this to the first byte we dont yet have
uint64_t offset_into_file {0u};
};
} // Components
std::optional<size_t> Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const {
std::vector<size_t> Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
const auto it = chunk_hash_to_index.find(hash);
if (it != chunk_hash_to_index.cend()) {
return it->second;
} else {
return std::nullopt;
return {};
}
}
@ -77,8 +91,9 @@ bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
return true;
}
if (auto i_opt = chunkIndex(hash); i_opt.has_value()) {
return have_chunk[i_opt.value()];
if (auto i_vec = chunkIndices(hash); !i_vec.empty()) {
// TODO: should i test all?
return have_chunk[i_vec.front()];
}
// not part of this file
@ -126,6 +141,14 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
msg.emplace_or_replace<Message::Components::Transfer::BytesSent>(ce.get<Message::Components::Transfer::BytesSent>());
}
if (ce.all_of<Message::Components::Transfer::BytesReceived>()) {
msg.emplace_or_replace<Message::Components::Transfer::BytesReceived>(ce.get<Message::Components::Transfer::BytesReceived>());
}
if (ce.all_of<Message::Components::Transfer::TagPaused>()) {
msg.emplace_or_replace<Message::Components::Transfer::TagPaused>();
} else {
msg.remove<Message::Components::Transfer::TagPaused>();
}
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
msg.emplace_or_replace<Message::Components::Transfer::TagHaveAll>();
}
@ -134,6 +157,64 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
}
}
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) {
// get a list of peers we can request this file from
// TODO: randomly request from non SuspectedParticipants
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
// 1 in 20 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
if (tox_peers.empty() || (_rng()%20) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
const uint32_t group_number = tox_peers.back().first;
auto gch = _tcm.getContactGroup(group_number);
assert(static_cast<bool>(gch));
std::vector<uint32_t> un_tox_peers;
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
un_tox_peers.push_back(tgpe.peer_number);
}
}
if (un_tox_peers.empty()) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng()%un_tox_peers.size();
const auto peer_number = un_tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
}
}
} else {
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
}
return std::nullopt;
}
SHA1_NGCFT1::SHA1_NGCFT1(
Contact3Registry& cr,
RegistryMessageModel& rmm,
@ -145,6 +226,9 @@ SHA1_NGCFT1::SHA1_NGCFT1(
_nft(nft),
_tcm(tcm)
{
// TODO: also create and destroy
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
_nft.subscribe(this, NGCFT1_Event::recv_request);
_nft.subscribe(this, NGCFT1_Event::recv_init);
_nft.subscribe(this, NGCFT1_Event::recv_data);
@ -169,8 +253,7 @@ void SHA1_NGCFT1::iterate(float delta) {
// if we have not heard for 10sec, timeout
if (it->second.time_since_activity >= 10.f) {
//std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
std::cerr << "SHA1_NGCFT1 warning: sending chunk tansfer timed out " << "." << int(it->first) << "\n";
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
it = peer_it->second.erase(it);
} else {
it++;
@ -184,18 +267,30 @@ void SHA1_NGCFT1::iterate(float delta) {
peer_it++;
}
}
//for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) {
//float& time_since_remove_activity = std::get<float>(*it);
//time_since_remove_activity += delta;
//// if we have not heard for 10sec, timeout
//if (time_since_remove_activity >= 10.f) {
//std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
//it = _transfers_sending_chunk.erase(it);
//} else {
//it++;
//}
//}
// receiving transfers
for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 10sec, timeout
if (it->second.time_since_activity >= 10.f) {
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
//it->second.v
it = peer_it->second.erase(it);
} else {
it++;
}
}
if (peer_it->second.empty()) {
// cleanup unused peers too agressive?
peer_it = _receiving_transfers.erase(peer_it);
} else {
peer_it++;
}
}
// queued requests
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
@ -225,6 +320,20 @@ void SHA1_NGCFT1::iterate(float delta) {
_contentr.remove<Components::ReRequestInfoTimer>(e);
}
}
{ // requested chunk timers
_contentr.view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
it->second += delta;
// 20sec, TODO: config
if (it->second >= 20.f) {
it = ftchunk_requested.chunks.erase(it);
} else {
it++;
}
}
});
}
}
// if we have not reached the total cap for transfers
@ -244,23 +353,41 @@ void SHA1_NGCFT1::iterate(float delta) {
if (!_queue_requested_chunk.empty()) { // then check for chunk requests
const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front();
auto chunk_idx_opt = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash);
if (chunk_idx_opt.has_value()) {
const auto& info = ce.get<Components::FT1InfoSHA1>();
auto chunk_idx_vec = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndices(chunk_hash);
if (!chunk_idx_vec.empty()) {
uint8_t transfer_id {0};
if (_nft.NGC_FT1_send_init_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
chunk_hash.data.data(), chunk_hash.size(),
chunkSize(info, chunk_idx_opt.value()),
&transfer_id
)) {
_sending_transfers
[combineIds(group_number, peer_number)]
[transfer_id] // TODO: also save index?
.v = SendingTransfer::Chunk{ce, chunk_idx_opt.value() * info.chunk_size};
// check if already sending
bool already_sending_to_this_peer = false;
if (_sending_transfers.count(combineIds(group_number, peer_number))) {
for (const auto& [_2, t] : _sending_transfers.at(combineIds(group_number, peer_number))) {
if (std::holds_alternative<SendingTransfer::Chunk>(t.v)) {
const auto& v = std::get<SendingTransfer::Chunk>(t.v);
if (v.content == ce && v.chunk_index == chunk_idx_vec.front()) {
// already sending
already_sending_to_this_peer = true;
break;
}
}
}
}
if (!already_sending_to_this_peer) {
const auto& info = ce.get<Components::FT1InfoSHA1>();
uint8_t transfer_id {0};
if (_nft.NGC_FT1_send_init_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
chunk_hash.data.data(), chunk_hash.size(),
chunkSize(info, chunk_idx_vec.front()),
&transfer_id
)) {
_sending_transfers
[combineIds(group_number, peer_number)]
[transfer_id] // TODO: also save index?
.v = SendingTransfer::Chunk{ce, chunk_idx_vec.front()};
}
} // else just remove from queue
}
// remove from queue regardless
_queue_requested_chunk.pop_front();
@ -279,68 +406,9 @@ void SHA1_NGCFT1::iterate(float delta) {
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
// get a list of peers we can request this file from
// TODO: randomly request from non SuspectedParticipants
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
// 1 in 20 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
if (tox_peers.empty() || (_rng()%20) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
const uint32_t group_number = tox_peers.back().first;
auto gch = _tcm.getContactGroup(group_number);
assert(static_cast<bool>(gch));
std::vector<uint32_t> un_tox_peers;
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
un_tox_peers.push_back(tgpe.peer_number);
}
}
if (un_tox_peers.empty()) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng()%un_tox_peers.size();
const auto peer_number = un_tox_peers.at(sample_i);
//const auto& info = msg.get<Components::FT1InfoSHA1>();
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO),
info_hash.data(), info_hash.size()
);
ce.emplace<Components::ReRequestInfoTimer>(0.f);
_queue_content_want_info.pop_front();
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << " (rng)\n";
}
}
} else {
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
//const auto& info = msg.get<Components::FT1InfoSHA1>();
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
@ -357,10 +425,166 @@ void SHA1_NGCFT1::iterate(float delta) {
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
}
} else if (!_queue_content_want_chunk.empty()) {
const auto ce = _queue_content_want_chunk.front();
auto& requested_chunks = ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
if (requested_chunks.size() < _max_pending_requests) {
// select chunk/make sure we still need one
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
//std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n";
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
const auto& info = ce.get<Components::FT1InfoSHA1>();
// naive, choose first chunk we dont have (double requests!!)
for (size_t chunk_idx = 0; chunk_idx < cc.have_chunk.size(); chunk_idx++) {
if (cc.have_chunk[chunk_idx]) {
continue;
}
// check by hash
if (cc.haveChunk(info.chunks.at(chunk_idx))) {
// TODO: fix this, a completed chunk should fill all the indecies it occupies
cc.have_chunk[chunk_idx] = true;
cc.have_count += 1;
if (cc.have_count == info.chunks.size()) {
cc.have_all = true;
cc.have_chunk.clear();
break;
}
continue;
}
if (requested_chunks.count(chunk_idx)) {
// already requested
continue;
}
// request chunk_idx
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size()
);
requested_chunks[chunk_idx] = 0.f;
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n";
break;
}
// ...
// TODO: properly determine
if (!cc.have_all) {
_queue_content_want_chunk.push_back(ce);
}
_queue_content_want_chunk.pop_front();
}
}
}
}
}
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// see tox_transfer_manager.cpp for reference
if (!e.e.all_of<Message::Components::Transfer::ActionAccept, Message::Components::Content>()) {
return false;
}
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
auto ce = e.e.get<Message::Components::Content>();
//if (!ce.all_of<Components::FT1InfoSHA1, Components::FT1ChunkSHA1Cache>()) {
if (!ce.all_of<Components::FT1InfoSHA1>()) {
// not ready to load yet, skip
return false;
}
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
assert(!ce.all_of<Message::Components::Transfer::File>());
// first, open file for write(+readback)
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path};
// TODO: replace with filesystem or something
// TODO: ensure dir exists
if (full_file_path.back() != '/') {
full_file_path += "/";
}
std::filesystem::create_directories(full_file_path);
const auto& info = ce.get<Components::FT1InfoSHA1>();
full_file_path += info.file_name;
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
std::unique_ptr<FileRWFile> file_impl;
const bool file_exists = std::filesystem::exists(full_file_path);
{
const bool truncate = !file_exists;
file_impl = std::make_unique<FileRWFile>(full_file_path, info.file_size, truncate);
}
if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
//e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
return false;
}
{ // next, create chuck cache and check for existing data
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
cc.have_all = false;
cc.have_count = 0;
cc.chunk_hash_to_index.clear(); // if copy pasta
if (file_exists) {
// iterate existing file
for (size_t i = 0; i < info.chunks.size(); i++) {
auto existing_data = file_impl->read(i*info.chunk_size, info.chunkSize(i));
// TODO: avoid copy
cc.have_chunk.push_back(
SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())} == info.chunks.at(i)
);
if (cc.have_chunk.back()) {
cc.have_count += 1;
}
_chunks[info.chunks[i]] = ce;
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
}
if (cc.have_count == info.chunks.size()) {
cc.have_all = true;
}
} else {
for (size_t i = 0; i < info.chunks.size(); i++) {
cc.have_chunk.push_back(false);
_chunks[info.chunks[i]] = ce;
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
}
}
if (!cc.have_all) {
// now, enque
_queue_content_want_chunk.push_back(ce);
}
}
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
ce.remove<Message::Components::Transfer::TagPaused>();
// should?
e.e.remove<Message::Components::Transfer::ActionAccept>();
updateMessages(ce);
return false;
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
// only interested in sha1
if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) {
@ -423,7 +647,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
}
assert(msg.all_of<Components::FT1ChunkSHA1Cache>());
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
if (!ce.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
// we dont have the chunk
@ -445,11 +669,121 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
return false;
}
return false;
// TODO: make sure we requested this?
if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) {
SHA1Digest sha1_info_hash {e.file_id, e.file_id_size};
if (!_info_to_content.count(sha1_info_hash)) {
// no idea about this content
return false;
}
auto ce = _info_to_content.at(sha1_info_hash);
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data, Components::FT1ChunkSHA1Cache>()) {
// we already have the info (should)
return false;
}
// TODO: check if e.file_size too large / ask for permission
if (e.file_size > 100*1024*1024) {
// a info size of 100MiB is ~640GiB for a 128KiB chunk size (default)
return false;
}
_receiving_transfers
[combineIds(e.group_number, e.peer_number)]
[e.transfer_id]
.v = ReceivingTransfer::Info{ce, std::vector<uint8_t>(e.file_size)};
e.accept = true;
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size};
if (!_chunks.count(sha1_chunk_hash)) {
// no idea about this content
return false;
}
auto ce = _chunks.at(sha1_chunk_hash);
// CHECK IF TRANSFER IN PROGESS!!
{ // they have the content (probably, might be fake, should move this to done)
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
}
assert(ce.all_of<Components::FT1InfoSHA1>());
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
const auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
if (cc.haveChunk(sha1_chunk_hash)) {
std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n";
// we have the chunk
return false;
}
// TODO: cache position
// calc offset_into_file
auto idx_vec = cc.chunkIndices(sha1_chunk_hash);
assert(!idx_vec.empty());
const auto& info = ce.get<Components::FT1InfoSHA1>();
// TODO: check e.file_size
assert(e.file_size == info.chunkSize(idx_vec.front()));
_receiving_transfers
[combineIds(e.group_number, e.peer_number)]
[e.transfer_id]
.v = ReceivingTransfer::Chunk{ce, idx_vec};
e.accept = true;
std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n";
} else {
assert(false && "unhandled case");
}
return true;
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
return false;
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
return false;
}
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
if (!peer_transfers.count(e.transfer_id)) {
return false;
}
auto& tv = peer_transfers[e.transfer_id].v;
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
auto& info_data = std::get<ReceivingTransfer::Info>(tv).info_data;
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
info_data[i+e.data_offset] = e.data[i];
}
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
assert(ce.all_of<Message::Components::Transfer::File>());
auto* file = ce.get<Message::Components::Transfer::File>().get();
assert(file != nullptr);
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
// TODO: avoid temporary copy
// TODO: check return
file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size});
}
} else {
assert(false && "unhandled case");
}
return true;
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
@ -476,8 +810,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
}
} else if (std::holds_alternative<SendingTransfer::Chunk>(transfer.v)) {
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
// TODO: should we really use file?
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(chunk_transfer.offset_into_file + e.data_offset, e.data_size);
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read((chunk_transfer.chunk_index * info.chunk_size) + e.data_offset, e.data_size);
// TODO: optimize
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
@ -502,7 +837,119 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
return false;
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
return false;
}
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
if (!peer_transfers.count(e.transfer_id)) {
return false;
}
const auto& tv = peer_transfers[e.transfer_id].v;
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
auto& info = std::get<ReceivingTransfer::Info>(tv);
auto ce = info.content;
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data>()) {
// we already have the info, discard
peer_transfers.erase(e.transfer_id);
return true;
}
// check if data matches hash
auto hash = hash_sha1(info.info_data.data(), info.info_data.size());
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
if (ce.get<Components::FT1InfoSHA1Hash>().hash != hash) {
std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n";
// requeue info request
peer_transfers.erase(e.transfer_id);
return true;
}
const auto& info_data = ce.emplace_or_replace<Components::FT1InfoSHA1Data>(std::move(info.info_data)).data;
auto& ft_info = ce.emplace_or_replace<Components::FT1InfoSHA1>();
ft_info.fromBuffer(info_data);
{ // file info
// TODO: not overwrite fi? since same?
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
file_info.total_size = ft_info.file_size;
}
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
ce.remove<Components::ReRequestInfoTimer>();
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
_queue_content_want_info.erase(it);
}
ce.emplace_or_replace<Message::Components::Transfer::TagPaused>();
updateMessages(ce);
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
const auto& info = ce.get<Components::FT1InfoSHA1>();
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
// HACK: only check first chunk (they *should* all be the same)
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.front();
const auto offset_into_file = chunk_index * info.chunk_size;
assert(chunk_index < info.chunks.size());
const auto chunk_size = info.chunkSize(chunk_index);
assert(offset_into_file+chunk_size <= info.file_size);
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(offset_into_file, chunk_size);
// check hash of chunk
auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size());
if (info.chunks.at(chunk_index) == got_hash) {
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
// remove from requested
// TODO: remove at init and track running transfers differently
for (const auto it : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
}
if (!cc.have_all) {
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) {
cc.have_chunk.at(inner_chunk_index) = true;
cc.have_count += 1;
if (cc.have_count == info.chunks.size()) {
// debug check
for ([[maybe_unused]] const bool it : cc.have_chunk) {
assert(it);
}
cc.have_all = true;
cc.have_chunk.clear(); // not wasting memory
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
}
// good chunk
// TODO: have wasted + metadata
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size();
}
}
} else {
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
}
} else {
// bad chunk
// TODO: requeue?
}
updateMessages(ce); // mostly for received bytes
}
peer_transfers.erase(e.transfer_id);
return true;
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
@ -597,6 +1044,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
//ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
}
ce.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
reg_ptr->emplace<Message::Components::Content>(new_msg_e, ce);
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
@ -713,10 +1161,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce;
for (size_t i = sha1_info.chunks.size(); i > 0; i--) {
_chunks[sha1_info.chunks[i-1]] = ce;
// chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index
cc.chunk_hash_to_index[sha1_info.chunks[i-1]] = i-1;
cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
}
}
@ -740,7 +1188,24 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
ce.emplace<Message::Components::Transfer::BytesSent>(0u);
}
ce.remove<Message::Components::Transfer::TagPaused>();
// we dont want the info anymore
ce.remove<Components::ReRequestInfoTimer>();
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
_queue_content_want_info.erase(it);
}
// TODO: we dont want chunks anymore
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
auto it = _queue_content_want_chunk.begin();
while (
it != _queue_content_want_chunk.end() &&
(it = std::find(it, _queue_content_want_chunk.end(), ce)) != _queue_content_want_chunk.end()
) {
it = _queue_content_want_chunk.erase(it);
}
} else {
ce = {_contentr, _contentr.create()};
_info_to_content[sha1_info_hash] = ce;
@ -756,9 +1221,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce;
cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]] = i;
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
}
}
@ -782,7 +1248,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
return true;
}
const auto msg_e = reg_ptr->create();
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
@ -798,6 +1263,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// file id would be sha1_info hash or something
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
// remove? done in updateMessages() anyway
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(msg_e, ce.get<Message::Components::Transfer::FileInfo>());
}
@ -811,17 +1277,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// TODO: determine if this is true
//reg_ptr->emplace<Message::Components::Transfer::TagPaused>(e);
#if 0
const auto friend_number = _cr.get<Contact::Components::ToxFriendEphemeral>(c).friend_number;
const auto&& [transfer_id, err] = _t.toxFileSend(friend_number, file_kind, file_impl->_file_size, file_id, file_name);
if (err == TOX_ERR_FILE_SEND_OK) {
reg_ptr->emplace<Message::Components::Transfer::ToxTransferFriend>(e, friend_number, transfer_id.value());
// TODO: add tag signifying init sent status?
toxFriendLookupAdd({*reg_ptr, e});
} // else queue?
#endif
if (_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
const uint32_t group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
uint32_t message_id = 0;

View File

@ -37,6 +37,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
// sha1 chunk index
// TODO: optimize lookup
// TODO: multiple contents. hashes might be unique, but data is not
entt::dense_map<SHA1Digest, ContentHandle> _chunks;
// group_number, peer_number, content, chunk_hash, timer
@ -53,7 +54,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
struct Chunk {
ContentHandle content;
uint64_t offset_into_file;
size_t chunk_index; // <.< remove offset_into_file
//uint64_t offset_into_file;
// or data?
// if memmapped, this would be just a pointer
};
@ -75,7 +77,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
struct Chunk {
ContentHandle content;
uint64_t offset_into_file;
std::vector<size_t> chunk_indices;
// or data?
// if memmapped, this would be just a pointer
};
@ -95,11 +97,15 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
void updateMessages(ContentHandle ce);
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ContentHandle ce);
public: // TODO: config
bool _udp_only {false};
size_t _max_concurrent_in {8};
size_t _max_concurrent_out {4};
size_t _max_concurrent_in {4};
size_t _max_concurrent_out {6};
// TODO: probably also includes running transfers rn (meh)
size_t _max_pending_requests {16}; // per content
public:
SHA1_NGCFT1(
@ -111,6 +117,9 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
void iterate(float delta);
protected: // rmm events (actions)
bool onEvent(const Message::Events::MessageUpdated&) override;
protected: // events
bool onEvent(const Events::NGCFT1_recv_request&) override;
bool onEvent(const Events::NGCFT1_recv_init&) override;