This commit is contained in:
Green Sky 2023-08-08 18:13:04 +02:00
parent 47b89ae577
commit 9ea1fbd725
No known key found for this signature in database

View File

@ -1,9 +1,12 @@
#include "./ngcft1.hpp" #include "./ngcft1.hpp"
#include <solanaceae/toxcore/utils.hpp>
#include <iostream> #include <iostream>
#include <set> #include <set>
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
#include <vector>
bool NGCFT1::sendPKG_FT1_REQUEST( bool NGCFT1::sendPKG_FT1_REQUEST(
uint32_t group_number, uint32_t peer_number, uint32_t group_number, uint32_t peer_number,
@ -128,11 +131,11 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
if (tf.time_since_activity >= init_retry_timeout_after) { if (tf.time_since_activity >= init_retry_timeout_after) {
if (tf.inits_sent >= 3) { if (tf.inits_sent >= 3) {
// delete, timed out 3 times // delete, timed out 3 times
fprintf(stderr, "FT: warning, ft init timed out, deleting\n"); std::cerr << "NGCFT1 warning: ft init timed out, deleting\n";
tf_opt.reset(); tf_opt.reset();
} else { } else {
// timed out, resend // timed out, resend
fprintf(stderr, "FT: warning, ft init timed out, resending\n"); std::cerr << "NGCFT1 warning: ft init timed out, resending\n";
sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size()); sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
tf.inits_sent++; tf.inits_sent++;
tf.time_since_activity = 0.f; tf.time_since_activity = 0.f;
@ -156,7 +159,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
if (tf.time_since_activity >= sending_give_up_after) { if (tf.time_since_activity >= sending_give_up_after) {
// no ack after 30sec, close ft // no ack after 30sec, close ft
// TODO: notify app // TODO: notify app
fprintf(stderr, "FT: warning, sending ft in progress timed out, deleting\n"); std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n";
// clean up cca // clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) { tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
@ -247,7 +250,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
if (tf.time_since_activity >= sending_give_up_after) { if (tf.time_since_activity >= sending_give_up_after) {
// no ack after 30sec, close ft // no ack after 30sec, close ft
// TODO: notify app // TODO: notify app
fprintf(stderr, "FT: warning, sending ft finishing timed out, deleting\n"); std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
// clean up cca // clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) { tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
@ -259,7 +262,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
} }
break; break;
default: // invalid state, delete default: // invalid state, delete
fprintf(stderr, "FT: error, ft in invalid state, deleting\n"); std::cerr << "NGCFT1 error: ft in invalid state, deleting\n";
tf_opt.reset(); tf_opt.reset();
//continue; //continue;
return; return;
@ -305,6 +308,8 @@ void NGCFT1::NGC_FT1_send_request_private(
uint32_t file_kind, uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size const uint8_t* file_id, size_t file_id_size
) { ) {
// TODO: error check
sendPKG_FT1_REQUEST(group_number, peer_number, file_kind, file_id, file_id_size);
} }
bool NGCFT1::NGC_FT1_send_init_private( bool NGCFT1::NGC_FT1_send_init_private(
@ -314,31 +319,241 @@ bool NGCFT1::NGC_FT1_send_init_private(
size_t file_size, size_t file_size,
uint8_t* transfer_id uint8_t* transfer_id
) { ) {
return false; if (std::get<0>(_t.toxGroupPeerGetConnectionStatus(group_number, peer_number)).value_or(TOX_CONNECTION_NONE) == TOX_CONNECTION_NONE) {
std::cerr << "NGCFT1 error: cant init ft, peer offline\n";
return false;
}
auto& peer = groups[group_number].peers[peer_number];
// allocate transfer_id
size_t idx = peer.next_send_transfer_idx;
peer.next_send_transfer_idx = (peer.next_send_transfer_idx + 1) % 256;
{ // TODO: extract
size_t i = idx;
bool found = false;
do {
if (!peer.send_transfers[i].has_value()) {
// free slot
idx = i;
found = true;
break;
}
i = (i + 1) % 256;
} while (i != idx);
if (!found) {
std::cerr << "NGCFT1 error: cant init ft, no free transfer slot\n";
return false;
}
}
// TODO: check return value
sendPKG_FT1_INIT(group_number, peer_number, file_kind, file_size, idx, file_id, file_id_size);
peer.send_transfers[idx] = Group::Peer::SendTransfer{
file_kind,
std::vector(file_id, file_id+file_id_size),
Group::Peer::SendTransfer::State::INIT_SENT,
1,
0.f,
file_size,
0,
{}, // ssb
};
if (transfer_id != nullptr) {
*transfer_id = idx;
}
return true;
} }
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request&) { bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
std::cout << "NGCFT1: FT1_REQUEST\n"; std::cout << "NGCFT1: FT1_REQUEST fk:" << e.file_kind << " [" << bin2hex(e.file_id) << "]\n";
return false;
// .... just rethrow??
// TODO: dont
return dispatch(
NGCFT1_Event::recv_request,
Events::NGCFT1_recv_request{
e.group_number, e.peer_number,
static_cast<NGCFT1_file_kind>(e.file_kind),
e.file_id.data(), e.file_id.size()
}
);
} }
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init&) { bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
std::cout << "NGCFT1: FT1_INIT\n"; std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << e.transfer_id << " [" << bin2hex(e.file_id) << "]\n";
return false;
bool accept = false;
dispatch(
NGCFT1_Event::recv_init,
Events::NGCFT1_recv_init{
e.group_number, e.peer_number,
static_cast<NGCFT1_file_kind>(e.file_kind),
e.file_id.data(), e.file_id.size(),
e.transfer_id,
e.file_size,
accept
}
);
if (!accept) {
std::cout << "NGCFT1: rejected init\n";
return true; // return true?
}
sendPKG_FT1_INIT_ACK(e.group_number, e.peer_number, e.transfer_id);
std::cout << "NGCFT1: accepted init\n";
auto& peer = groups[e.group_number].peers[e.peer_number];
if (peer.recv_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << e.transfer_id << "\n";
}
peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{
e.file_kind,
e.file_id,
Group::Peer::RecvTransfer::State::INITED,
e.file_size,
0u,
{} // rsb
};
return true;
} }
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack&) { bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
std::cout << "NGCFT1: FT1_INIT_ACK\n"; std::cout << "NGCFT1: FT1_INIT_ACK\n";
return false;
// we now should start sending data
if (!groups.count(e.group_number)) {
std::cerr << "NGCFT1 warning: init_ack for unknown group\n";
return true;
}
Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
if (!peer.send_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: init_ack for unknown transfer\n";
return true;
}
Group::Peer::SendTransfer& transfer = peer.send_transfers[e.transfer_id].value();
using State = Group::Peer::SendTransfer::State;
if (transfer.state != State::INIT_SENT) {
std::cerr << "NGCFT1 error: inti_ack but not in INIT_SENT state\n";
return true;
}
// iterate will now call NGC_FT1_send_data_cb
transfer.state = State::SENDING;
transfer.time_since_activity = 0.f;
return true;
} }
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data&) { bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
std::cout << "NGCFT1: FT1_DATA\n"; std::cout << "NGCFT1: FT1_DATA\n";
return false;
if (e.data.empty()) {
std::cerr << "NGCFT1 error: data of size 0!\n";
return true;
}
if (!groups.count(e.group_number)) {
std::cerr << "NGCFT1 warning: data for unknown group\n";
return true;
}
Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
if (!peer.recv_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: data for unknown transfer\n";
return true;
}
auto& transfer = peer.recv_transfers[e.transfer_id].value();
// do reassembly, ignore dups
transfer.rsb.add(e.sequence_id, std::vector<uint8_t>(e.data)); // TODO: ugly explicit copy for what should just be a move
// loop for chunks without holes
while (transfer.rsb.canPop()) {
auto data = transfer.rsb.pop();
// TODO: check return value
dispatch(
NGCFT1_Event::recv_data,
Events::NGCFT1_recv_data{
e.group_number, e.peer_number,
e.transfer_id,
transfer.file_size_current,
data.data(), data.size()
}
);
transfer.file_size_current += data.size();
}
// send acks
std::vector<uint16_t> ack_seq_ids(transfer.rsb.ack_seq_ids.cbegin(), transfer.rsb.ack_seq_ids.cend());
// TODO: check if this caps at max acks
if (!ack_seq_ids.empty()) {
// TODO: check return value
sendPKG_FT1_DATA_ACK(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size());
}
return true;
} }
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack&) { bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
std::cout << "NGCFT1: FT1_DATA_ACK\n"; std::cout << "NGCFT1: FT1_DATA_ACK\n";
return false;
if (!groups.count(e.group_number)) {
std::cerr << "NGCFT1 warning: data_ack for unknown group\n";
return true;
}
Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
if (!peer.send_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
return true;
}
Group::Peer::SendTransfer& transfer = peer.send_transfers[e.transfer_id].value();
using State = Group::Peer::SendTransfer::State;
if (transfer.state != State::SENDING && transfer.state != State::FINISHING) {
std::cerr << "NGCFT1 error: data_ack but not in SENDING or FINISHING state (" << int(transfer.state) << ")\n";
return true;
}
//if ((length - curser) % sizeof(uint16_t) != 0) {
//fprintf(stderr, "FT: data_ack with misaligned data\n");
//return;
//}
transfer.time_since_activity = 0.f;
std::vector<LEDBAT::SeqIDType> seqs;
for (const auto it : e.sequence_ids) {
// TODO: improve this o.o
seqs.push_back({e.transfer_id, it});
transfer.ssb.erase(it);
}
peer.cca.onAck(seqs);
// delete if all packets acked
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
std::cout << "NGCFT1: " << e.transfer_id << " done\n";
peer.send_transfers[e.transfer_id].reset();
}
return true;
} }