packet assembly and iterate mostly done
This commit is contained in:
parent
7f361cbc4b
commit
a9f1b048fd
284
src/ngcft1.cpp
284
src/ngcft1.cpp
@ -1,9 +1,279 @@
|
||||
#include "./ngcft1.hpp"
|
||||
|
||||
#include <iostream>
|
||||
#include <set>
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
|
||||
bool NGCFT1::sendPKG_FT1_REQUEST(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint32_t file_kind,
|
||||
const uint8_t* file_id, size_t file_id_size
|
||||
) {
|
||||
// - 1 byte packet id
|
||||
// - 4 byte file_kind
|
||||
// - X bytes file_id
|
||||
std::vector<uint8_t> pkg;
|
||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_REQUEST));
|
||||
for (size_t i = 0; i < sizeof(file_kind); i++) {
|
||||
pkg.push_back((file_kind>>(i*8)) & 0xff);
|
||||
}
|
||||
for (size_t i = 0; i < file_id_size; i++) {
|
||||
pkg.push_back(file_id[i]);
|
||||
}
|
||||
|
||||
// lossless
|
||||
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
|
||||
}
|
||||
|
||||
bool NGCFT1::sendPKG_FT1_INIT(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint32_t file_kind,
|
||||
uint64_t file_size,
|
||||
uint8_t transfer_id,
|
||||
const uint8_t* file_id, size_t file_id_size
|
||||
) {
|
||||
// - 1 byte packet id
|
||||
// - 4 byte (file_kind)
|
||||
// - 8 bytes (data size)
|
||||
// - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts)
|
||||
// - X bytes (file_kind dependent id, differnt sizes)
|
||||
|
||||
std::vector<uint8_t> pkg;
|
||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT));
|
||||
for (size_t i = 0; i < sizeof(file_kind); i++) {
|
||||
pkg.push_back((file_kind>>(i*8)) & 0xff);
|
||||
}
|
||||
for (size_t i = 0; i < sizeof(file_size); i++) {
|
||||
pkg.push_back((file_size>>(i*8)) & 0xff);
|
||||
}
|
||||
pkg.push_back(transfer_id);
|
||||
for (size_t i = 0; i < file_id_size; i++) {
|
||||
pkg.push_back(file_id[i]);
|
||||
}
|
||||
|
||||
// lossless
|
||||
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
|
||||
}
|
||||
|
||||
bool NGCFT1::sendPKG_FT1_INIT_ACK(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint8_t transfer_id
|
||||
) {
|
||||
// - 1 byte packet id
|
||||
// - 1 byte transfer_id
|
||||
std::vector<uint8_t> pkg;
|
||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT_ACK));
|
||||
pkg.push_back(transfer_id);
|
||||
|
||||
// lossless
|
||||
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
|
||||
}
|
||||
|
||||
bool NGCFT1::sendPKG_FT1_DATA(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint8_t transfer_id,
|
||||
uint16_t sequence_id,
|
||||
const uint8_t* data, size_t data_size
|
||||
) {
|
||||
assert(data_size > 0);
|
||||
|
||||
// TODO
|
||||
// check header_size+data_size <= max pkg size
|
||||
|
||||
std::vector<uint8_t> pkg;
|
||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA));
|
||||
pkg.push_back(transfer_id);
|
||||
pkg.push_back(sequence_id & 0xff);
|
||||
pkg.push_back((sequence_id >> (1*8)) & 0xff);
|
||||
|
||||
// TODO: optimize
|
||||
for (size_t i = 0; i < data_size; i++) {
|
||||
pkg.push_back(data[i]);
|
||||
}
|
||||
|
||||
// lossy
|
||||
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
|
||||
}
|
||||
|
||||
bool NGCFT1::sendPKG_FT1_DATA_ACK(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint8_t transfer_id,
|
||||
const uint16_t* seq_ids, size_t seq_ids_size
|
||||
) {
|
||||
std::vector<uint8_t> pkg;
|
||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA_ACK));
|
||||
pkg.push_back(transfer_id);
|
||||
|
||||
// TODO: optimize
|
||||
for (size_t i = 0; i < seq_ids_size; i++) {
|
||||
pkg.push_back(seq_ids[i] & 0xff);
|
||||
pkg.push_back((seq_ids[i] >> (1*8)) & 0xff);
|
||||
}
|
||||
|
||||
// lossy
|
||||
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
|
||||
}
|
||||
|
||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<LEDBAT::SeqIDType>& timeouts_set) {
|
||||
auto& tf_opt = peer.send_transfers.at(idx);
|
||||
assert(tf_opt.has_value());
|
||||
auto& tf = tf_opt.value();
|
||||
|
||||
tf.time_since_activity += time_delta;
|
||||
|
||||
switch (tf.state) {
|
||||
using State = Group::Peer::SendTransfer::State;
|
||||
case State::INIT_SENT:
|
||||
if (tf.time_since_activity >= init_retry_timeout_after) {
|
||||
if (tf.inits_sent >= 3) {
|
||||
// delete, timed out 3 times
|
||||
fprintf(stderr, "FT: warning, ft init timed out, deleting\n");
|
||||
tf_opt.reset();
|
||||
} else {
|
||||
// timed out, resend
|
||||
fprintf(stderr, "FT: warning, ft init timed out, resending\n");
|
||||
sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
|
||||
tf.inits_sent++;
|
||||
tf.time_since_activity = 0.f;
|
||||
}
|
||||
}
|
||||
//break;
|
||||
return;
|
||||
case State::SENDING: {
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
// no ack after 5 sec -> resend
|
||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
||||
if (timeouts_set.count({idx, id})) {
|
||||
// TODO: can fail
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca.onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
}
|
||||
});
|
||||
|
||||
if (tf.time_since_activity >= sending_give_up_after) {
|
||||
// no ack after 30sec, close ft
|
||||
// TODO: notify app
|
||||
fprintf(stderr, "FT: warning, sending ft in progress timed out, deleting\n");
|
||||
|
||||
// clean up cca
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
peer.cca.onLoss({idx, id}, true);
|
||||
timeouts_set.erase({idx, id});
|
||||
});
|
||||
|
||||
tf_opt.reset();
|
||||
//continue; // dangerous control flow
|
||||
return;
|
||||
}
|
||||
|
||||
assert(ngc_ft1_ctx->cb_send_data.count(tf.file_kind));
|
||||
|
||||
// if chunks in flight < window size (2)
|
||||
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
|
||||
int64_t can_packet_size {static_cast<int64_t>(peer.cca.canSend())};
|
||||
//if (can_packet_size) {
|
||||
//std::cerr << "FT: can_packet_size: " << can_packet_size;
|
||||
//}
|
||||
size_t count {0};
|
||||
while (can_packet_size > 0 && tf.file_size > 0) {
|
||||
std::vector<uint8_t> new_data;
|
||||
|
||||
// TODO: parameterize packet size? -> only if JF increases lossy packet size >:)
|
||||
//size_t chunk_size = std::min<size_t>(496u, tf.file_size - tf.file_size_current);
|
||||
//size_t chunk_size = std::min<size_t>(can_packet_size, tf.file_size - tf.file_size_current);
|
||||
size_t chunk_size = std::min<size_t>({
|
||||
//496u,
|
||||
//996u,
|
||||
peer.cca.MAXIMUM_SEGMENT_DATA_SIZE,
|
||||
static_cast<size_t>(can_packet_size),
|
||||
tf.file_size - tf.file_size_current
|
||||
});
|
||||
if (chunk_size == 0) {
|
||||
tf.state = State::FINISHING;
|
||||
break; // we done
|
||||
}
|
||||
|
||||
new_data.resize(chunk_size);
|
||||
|
||||
ngc_ft1_ctx->cb_send_data[tf.file_kind](
|
||||
tox,
|
||||
group_number, peer_number,
|
||||
idx,
|
||||
tf.file_size_current,
|
||||
new_data.data(), new_data.size(),
|
||||
ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr
|
||||
);
|
||||
uint16_t seq_id = tf.ssb.add(std::move(new_data));
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
|
||||
peer.cca.onSent({idx, seq_id}, chunk_size);
|
||||
|
||||
#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1
|
||||
fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id);
|
||||
#endif
|
||||
|
||||
tf.file_size_current += chunk_size;
|
||||
can_packet_size -= chunk_size;
|
||||
count++;
|
||||
}
|
||||
//if (count) {
|
||||
//std::cerr << " split over " << count << "\n";
|
||||
//}
|
||||
}
|
||||
break;
|
||||
case State::FINISHING: // we still have unacked packets
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
// no ack after 5 sec -> resend
|
||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
||||
if (timeouts_set.count({idx, id})) {
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca.onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
}
|
||||
});
|
||||
if (tf.time_since_activity >= sending_give_up_after) {
|
||||
// no ack after 30sec, close ft
|
||||
// TODO: notify app
|
||||
fprintf(stderr, "FT: warning, sending ft finishing timed out, deleting\n");
|
||||
|
||||
// clean up cca
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
peer.cca.onLoss({idx, id}, true);
|
||||
timeouts_set.erase({idx, id});
|
||||
});
|
||||
|
||||
tf_opt.reset();
|
||||
}
|
||||
break;
|
||||
default: // invalid state, delete
|
||||
fprintf(stderr, "FT: error, ft in invalid state, deleting\n");
|
||||
tf_opt.reset();
|
||||
//continue;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
||||
auto timeouts = peer.cca.getTimeouts();
|
||||
std::set<LEDBAT::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
||||
|
||||
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
||||
if (peer.send_transfers.at(idx).has_value()) {
|
||||
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: receiving tranfers?
|
||||
}
|
||||
|
||||
NGCFT1::NGCFT1(
|
||||
ToxI& t,
|
||||
ToxEventProviderI& tep,
|
||||
NGCEXTEventProviderI& neep
|
||||
) : _tep(tep), _neep(neep)
|
||||
) : _t(t), _tep(tep), _neep(neep)
|
||||
{
|
||||
_neep.subscribe(this, NGCEXT_Event::FT1_REQUEST);
|
||||
_neep.subscribe(this, NGCEXT_Event::FT1_INIT);
|
||||
@ -12,7 +282,12 @@ NGCFT1::NGCFT1(
|
||||
_neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK);
|
||||
}
|
||||
|
||||
void NGCFT1::iterate(float delta) {
|
||||
void NGCFT1::iterate(float time_delta) {
|
||||
for (auto& [group_number, group] : groups) {
|
||||
for (auto& [peer_number, peer] : group.peers) {
|
||||
iteratePeer(time_delta, group_number, peer_number, peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NGCFT1::NGC_FT1_send_request_private(
|
||||
@ -33,22 +308,27 @@ bool NGCFT1::NGC_FT1_send_init_private(
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request&) {
|
||||
std::cout << "NGCFT1: FT1_REQUEST\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init&) {
|
||||
std::cout << "NGCFT1: FT1_INIT\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack&) {
|
||||
std::cout << "NGCFT1: FT1_INIT_ACK\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data&) {
|
||||
std::cout << "NGCFT1: FT1_DATA\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack&) {
|
||||
std::cout << "NGCFT1: FT1_DATA_ACK\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
// solanaceae port of tox_ngc_ft1
|
||||
|
||||
#include <solanaceae/toxcore/tox_interface.hpp>
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
|
||||
#include "./ngcext.hpp"
|
||||
@ -11,6 +12,7 @@
|
||||
#include "./snd_buf.hpp"
|
||||
|
||||
#include <map>
|
||||
#include <set>
|
||||
|
||||
// uint32_t - same as tox friend ft
|
||||
// TODO: fill in toxfriend file kinds
|
||||
@ -127,6 +129,7 @@ enum class NGCFT1_file_kind : uint32_t {
|
||||
|
||||
|
||||
class NGCFT1 : public ToxEventI, public NGCEXTEventI {
|
||||
ToxI& _t;
|
||||
ToxEventProviderI& _tep;
|
||||
NGCEXTEventProviderI& _neep;
|
||||
|
||||
@ -190,9 +193,19 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI {
|
||||
};
|
||||
std::map<uint32_t, Group> groups;
|
||||
|
||||
protected:
|
||||
bool sendPKG_FT1_REQUEST(uint32_t group_number, uint32_t peer_number, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
||||
bool sendPKG_FT1_INIT(uint32_t group_number, uint32_t peer_number, uint32_t file_kind, uint64_t file_size, uint8_t transfer_id, const uint8_t* file_id, size_t file_id_size);
|
||||
bool sendPKG_FT1_INIT_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id);
|
||||
bool sendPKG_FT1_DATA(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, uint16_t sequence_id, const uint8_t* data, size_t data_size);
|
||||
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
||||
|
||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<LEDBAT::SeqIDType>& timeouts_set);
|
||||
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||
|
||||
public:
|
||||
NGCFT1(
|
||||
ToxI& t,
|
||||
ToxEventProviderI& tep,
|
||||
NGCEXTEventProviderI& neep
|
||||
);
|
||||
|
Loading…
Reference in New Issue
Block a user