diff --git a/src/ngcft1.cpp b/src/ngcft1.cpp index e18511e..6726a9a 100644 --- a/src/ngcft1.cpp +++ b/src/ngcft1.cpp @@ -1,9 +1,279 @@ #include "./ngcft1.hpp" +#include +#include +#include +#include + +bool NGCFT1::sendPKG_FT1_REQUEST( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size +) { + // - 1 byte packet id + // - 4 byte file_kind + // - X bytes file_id + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_REQUEST)); + for (size_t i = 0; i < sizeof(file_kind); i++) { + pkg.push_back((file_kind>>(i*8)) & 0xff); + } + for (size_t i = 0; i < file_id_size; i++) { + pkg.push_back(file_id[i]); + } + + // lossless + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_INIT( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + uint64_t file_size, + uint8_t transfer_id, + const uint8_t* file_id, size_t file_id_size +) { + // - 1 byte packet id + // - 4 byte (file_kind) + // - 8 bytes (data size) + // - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts) + // - X bytes (file_kind dependent id, differnt sizes) + + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_INIT)); + for (size_t i = 0; i < sizeof(file_kind); i++) { + pkg.push_back((file_kind>>(i*8)) & 0xff); + } + for (size_t i = 0; i < sizeof(file_size); i++) { + pkg.push_back((file_size>>(i*8)) & 0xff); + } + pkg.push_back(transfer_id); + for (size_t i = 0; i < file_id_size; i++) { + pkg.push_back(file_id[i]); + } + + // lossless + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_INIT_ACK( + uint32_t group_number, uint32_t peer_number, + uint8_t transfer_id +) { + // - 1 byte packet id + // - 1 byte transfer_id + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_INIT_ACK)); + pkg.push_back(transfer_id); + + // lossless + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_DATA( + uint32_t group_number, uint32_t peer_number, + uint8_t transfer_id, + uint16_t sequence_id, + const uint8_t* data, size_t data_size +) { + assert(data_size > 0); + + // TODO + // check header_size+data_size <= max pkg size + + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_DATA)); + pkg.push_back(transfer_id); + pkg.push_back(sequence_id & 0xff); + pkg.push_back((sequence_id >> (1*8)) & 0xff); + + // TODO: optimize + for (size_t i = 0; i < data_size; i++) { + pkg.push_back(data[i]); + } + + // lossy + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_DATA_ACK( + uint32_t group_number, uint32_t peer_number, + uint8_t transfer_id, + const uint16_t* seq_ids, size_t seq_ids_size +) { + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_DATA_ACK)); + pkg.push_back(transfer_id); + + // TODO: optimize + for (size_t i = 0; i < seq_ids_size; i++) { + pkg.push_back(seq_ids[i] & 0xff); + pkg.push_back((seq_ids[i] >> (1*8)) & 0xff); + } + + // lossy + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set) { + auto& tf_opt = peer.send_transfers.at(idx); + assert(tf_opt.has_value()); + auto& tf = tf_opt.value(); + + tf.time_since_activity += time_delta; + + switch (tf.state) { + using State = Group::Peer::SendTransfer::State; + case State::INIT_SENT: + if (tf.time_since_activity >= init_retry_timeout_after) { + if (tf.inits_sent >= 3) { + // delete, timed out 3 times + fprintf(stderr, "FT: warning, ft init timed out, deleting\n"); + tf_opt.reset(); + } else { + // timed out, resend + fprintf(stderr, "FT: warning, ft init timed out, resending\n"); + sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size()); + tf.inits_sent++; + tf.time_since_activity = 0.f; + } + } + //break; + return; + case State::SENDING: { + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + // no ack after 5 sec -> resend + //if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) { + if (timeouts_set.count({idx, id})) { + // TODO: can fail + sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size()); + peer.cca.onLoss({idx, id}, false); + time_since_activity = 0.f; + timeouts_set.erase({idx, id}); + } + }); + + if (tf.time_since_activity >= sending_give_up_after) { + // no ack after 30sec, close ft + // TODO: notify app + fprintf(stderr, "FT: warning, sending ft in progress timed out, deleting\n"); + + // clean up cca + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + peer.cca.onLoss({idx, id}, true); + timeouts_set.erase({idx, id}); + }); + + tf_opt.reset(); + //continue; // dangerous control flow + return; + } + + assert(ngc_ft1_ctx->cb_send_data.count(tf.file_kind)); + + // if chunks in flight < window size (2) + //while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) { + int64_t can_packet_size {static_cast(peer.cca.canSend())}; + //if (can_packet_size) { + //std::cerr << "FT: can_packet_size: " << can_packet_size; + //} + size_t count {0}; + while (can_packet_size > 0 && tf.file_size > 0) { + std::vector new_data; + + // TODO: parameterize packet size? -> only if JF increases lossy packet size >:) + //size_t chunk_size = std::min(496u, tf.file_size - tf.file_size_current); + //size_t chunk_size = std::min(can_packet_size, tf.file_size - tf.file_size_current); + size_t chunk_size = std::min({ + //496u, + //996u, + peer.cca.MAXIMUM_SEGMENT_DATA_SIZE, + static_cast(can_packet_size), + tf.file_size - tf.file_size_current + }); + if (chunk_size == 0) { + tf.state = State::FINISHING; + break; // we done + } + + new_data.resize(chunk_size); + + ngc_ft1_ctx->cb_send_data[tf.file_kind]( + tox, + group_number, peer_number, + idx, + tf.file_size_current, + new_data.data(), new_data.size(), + ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr + ); + uint16_t seq_id = tf.ssb.add(std::move(new_data)); + sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); + peer.cca.onSent({idx, seq_id}, chunk_size); + +#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1 + fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id); +#endif + + tf.file_size_current += chunk_size; + can_packet_size -= chunk_size; + count++; + } + //if (count) { + //std::cerr << " split over " << count << "\n"; + //} + } + break; + case State::FINISHING: // we still have unacked packets + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + // no ack after 5 sec -> resend + //if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) { + if (timeouts_set.count({idx, id})) { + sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size()); + peer.cca.onLoss({idx, id}, false); + time_since_activity = 0.f; + timeouts_set.erase({idx, id}); + } + }); + if (tf.time_since_activity >= sending_give_up_after) { + // no ack after 30sec, close ft + // TODO: notify app + fprintf(stderr, "FT: warning, sending ft finishing timed out, deleting\n"); + + // clean up cca + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + peer.cca.onLoss({idx, id}, true); + timeouts_set.erase({idx, id}); + }); + + tf_opt.reset(); + } + break; + default: // invalid state, delete + fprintf(stderr, "FT: error, ft in invalid state, deleting\n"); + tf_opt.reset(); + //continue; + return; + } +} + +void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) { + auto timeouts = peer.cca.getTimeouts(); + std::set timeouts_set{timeouts.cbegin(), timeouts.cend()}; + + for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) { + if (peer.send_transfers.at(idx).has_value()) { + updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set); + } + } + + // TODO: receiving tranfers? +} + NGCFT1::NGCFT1( + ToxI& t, ToxEventProviderI& tep, NGCEXTEventProviderI& neep -) : _tep(tep), _neep(neep) +) : _t(t), _tep(tep), _neep(neep) { _neep.subscribe(this, NGCEXT_Event::FT1_REQUEST); _neep.subscribe(this, NGCEXT_Event::FT1_INIT); @@ -12,7 +282,12 @@ NGCFT1::NGCFT1( _neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK); } -void NGCFT1::iterate(float delta) { +void NGCFT1::iterate(float time_delta) { + for (auto& [group_number, group] : groups) { + for (auto& [peer_number, peer] : group.peers) { + iteratePeer(time_delta, group_number, peer_number, peer); + } + } } void NGCFT1::NGC_FT1_send_request_private( @@ -33,22 +308,27 @@ bool NGCFT1::NGC_FT1_send_init_private( } bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request&) { + std::cout << "NGCFT1: FT1_REQUEST\n"; return false; } bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init&) { + std::cout << "NGCFT1: FT1_INIT\n"; return false; } bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack&) { + std::cout << "NGCFT1: FT1_INIT_ACK\n"; return false; } bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data&) { + std::cout << "NGCFT1: FT1_DATA\n"; return false; } bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack&) { + std::cout << "NGCFT1: FT1_DATA_ACK\n"; return false; } diff --git a/src/ngcft1.hpp b/src/ngcft1.hpp index 278f898..7fcf9d3 100644 --- a/src/ngcft1.hpp +++ b/src/ngcft1.hpp @@ -2,6 +2,7 @@ // solanaceae port of tox_ngc_ft1 +#include #include #include "./ngcext.hpp" @@ -11,6 +12,7 @@ #include "./snd_buf.hpp" #include +#include // uint32_t - same as tox friend ft // TODO: fill in toxfriend file kinds @@ -127,6 +129,7 @@ enum class NGCFT1_file_kind : uint32_t { class NGCFT1 : public ToxEventI, public NGCEXTEventI { + ToxI& _t; ToxEventProviderI& _tep; NGCEXTEventProviderI& _neep; @@ -190,9 +193,19 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI { }; std::map groups; + protected: + bool sendPKG_FT1_REQUEST(uint32_t group_number, uint32_t peer_number, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size); + bool sendPKG_FT1_INIT(uint32_t group_number, uint32_t peer_number, uint32_t file_kind, uint64_t file_size, uint8_t transfer_id, const uint8_t* file_id, size_t file_id_size); + bool sendPKG_FT1_INIT_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id); + bool sendPKG_FT1_DATA(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, uint16_t sequence_id, const uint8_t* data, size_t data_size); + bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size); + + void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set); + void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer); public: NGCFT1( + ToxI& t, ToxEventProviderI& tep, NGCEXTEventProviderI& neep );