From dbcf2777c0ff34b741502963d1d7ba520855239f Mon Sep 17 00:00:00 2001 From: Green Sky Date: Wed, 11 Jan 2023 19:00:49 +0100 Subject: [PATCH] tf now more or less working --- ngc_ft1.cpp | 186 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 159 insertions(+), 27 deletions(-) diff --git a/ngc_ft1.cpp b/ngc_ft1.cpp index 4889d41..b6e5054 100644 --- a/ngc_ft1.cpp +++ b/ngc_ft1.cpp @@ -3,6 +3,7 @@ #include "ngc_ext_common.hpp" #include +#include #include #include #include @@ -10,7 +11,7 @@ struct SendSequenceBuffer { struct SSBEntry { - std::vector data; // the data (variable size, but smallen than 500) + std::vector data; // the data (variable size, but smaller than 500) float time_since_activity {0.f}; }; @@ -42,6 +43,60 @@ struct SendSequenceBuffer { } }; +struct RecvSequenceBuffer { + struct RSBEntry { + std::vector data; + }; + + // sequence_id -> entry + std::map entries; + + uint16_t next_seq_id {0}; + + // list of seq_ids to ack, this is seperate bc rsbentries are deleted once processed + std::deque ack_seq_ids; + + void erase(uint16_t seq) { + entries.erase(seq); + } + + // inflight chunks + size_t size(void) const { + return entries.size(); + } + + void add(uint16_t seq_id, std::vector&& data) { + entries[seq_id] = {data}; + ack_seq_ids.push_back(seq_id); + if (ack_seq_ids.size() > 5) { // TODO: magic + ack_seq_ids.pop_front(); + } + } + + bool canPop(void) const { + return entries.count(next_seq_id); + } + + std::vector pop(void) { + assert(canPop()); + auto tmp_data = entries.at(next_seq_id).data; + erase(next_seq_id); + next_seq_id++; + return tmp_data; + } + + // for acking, might be bad since its front + std::vector frontSeqIDs(size_t count = 5) const { + std::vector seq_ids; + auto it = entries.cbegin(); + for (size_t i = 0; i < 5 && it != entries.cend(); i++, it++) { + seq_ids.push_back(it->first); + } + + return seq_ids; + } +}; + struct NGC_FT1 { NGC_FT1_options options; @@ -53,6 +108,7 @@ struct NGC_FT1 { struct Group { struct Peer { struct RecvTransfer { + NGC_FT1_file_kind file_kind; std::vector file_id; enum class State { @@ -64,8 +120,8 @@ struct NGC_FT1 { size_t file_size {0}; size_t file_size_current {0}; - // sequence array - // list of last x received seq_ids + // sequence id based reassembly + RecvSequenceBuffer rsb; }; std::array, 256> recv_transfers; size_t next_recv_transfer_idx {0}; // next id will be 0 @@ -130,8 +186,8 @@ bool NGC_FT1_init(NGC_EXT_CTX* ngc_ext_ctx, const struct NGC_FT1_options* option ngc_ext_ctx->callbacks[FT1_REQUEST] = _handle_FT1_REQUEST; ngc_ext_ctx->callbacks[FT1_INIT] = _handle_FT1_INIT; ngc_ext_ctx->callbacks[FT1_INIT_ACK] = _handle_FT1_INIT_ACK; - ngc_ext_ctx->callbacks[FT1_DATA] = nullptr; - ngc_ext_ctx->callbacks[FT1_DATA_ACK] = nullptr; + ngc_ext_ctx->callbacks[FT1_DATA] = _handle_FT1_DATA; + ngc_ext_ctx->callbacks[FT1_DATA_ACK] = _handle_FT1_DATA_ACK; return true; } @@ -189,6 +245,11 @@ void NGC_FT1_iterate(Tox *tox, NGC_EXT_CTX* ngc_ext_ctx/*, void *user_data*/) { std::vector new_data; size_t chunk_size = std::min(10u, tf.file_size - tf.file_size_current); + if (chunk_size == 0) { + // TODO: set to finishing? + break; // we done + } + new_data.resize(chunk_size); ngc_ext_ctx->ngc_ft1_ctx->cb_send_data[tf.file_kind](tox, ngc_ext_ctx, group_number, peer_number, idx, tf.file_size_current, new_data.data(), new_data.size()); @@ -239,14 +300,6 @@ void NGC_FT1_register_callback_recv_init( ngc_ext_ctx->ngc_ft1_ctx->cb_recv_init[file_kind] = callback; } -//void NGC_FT1_register_callback_begin_send( - //NGC_EXT_CTX* ngc_ext_ctx, - //NGC_FT1_file_kind file_kind, - //NGC_FT1_begin_send* callback -//) { - //ngc_ext_ctx->ngc_ft1_ctx->cb_begin_send[file_kind] = callback; -//} - void NGC_FT1_register_callback_recv_data( NGC_EXT_CTX* ngc_ext_ctx, NGC_FT1_file_kind file_kind, @@ -412,6 +465,7 @@ static bool _send_pkg_FT1_INIT_ACK(const Tox* tox, uint32_t group_number, uint32 } static bool _send_pkg_FT1_DATA(const Tox* tox, uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, uint16_t sequence_id, const uint8_t* data, size_t data_size) { + assert(data_size > 0); // TODO // check header_size+data_size <= max pkg size @@ -455,15 +509,6 @@ static bool _send_pkg_FT1_DATA_ACK(const Tox* tox, uint32_t group_number, uint32 //return tox_group_send_custom_private_packet(tox, group_number, peer_number, true, pkg.data(), pkg.size(), nullptr); //} -//static bool _send_pkg_FT1_DATA_FIN_ACK(const Tox* tox, uint32_t group_number, uint32_t peer_number, uint8_t transfer_id) { - //std::vector pkg; - //pkg.push_back(FT1_DATA_FIN_ACK); - //pkg.push_back(transfer_id); - - //// lossless? - //return tox_group_send_custom_private_packet(tox, group_number, peer_number, true, pkg.data(), pkg.size(), nullptr); -//} - #define _DATA_HAVE(x, error) if ((length - curser) < (x)) { error; } static void _handle_FT1_REQUEST( @@ -501,7 +546,6 @@ static void _handle_FT1_REQUEST( } } - static void _handle_FT1_INIT( Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, @@ -536,6 +580,7 @@ static void _handle_FT1_INIT( // - X bytes (file_kind dependent id, differnt sizes) + const std::vector file_id(data+curser, data+curser+(length-curser)); fprintf(stderr, "FT: got FT init with file_kind:%u file_size:%lu tf_id:%u [", file_kind_u8, file_size, transfer_id); for (size_t curser_copy = curser; curser_copy < length; curser_copy++) { fprintf(stderr, "%02X", data[curser_copy]); @@ -561,6 +606,18 @@ static void _handle_FT1_INIT( if (accept_ft) { _send_pkg_FT1_INIT_ACK(tox, group_number, peer_number, transfer_id); fprintf(stderr, "FT: accepted init\n"); + auto& peer = ngc_ext_ctx->ngc_ft1_ctx->groups[group_number].peers[peer_number]; + if (peer.recv_transfers[transfer_id].has_value()) { + fprintf(stderr, "FT: overwriting existing recv_transfer %d\n", transfer_id); + } + + peer.recv_transfers[transfer_id] = NGC_FT1::Group::Peer::RecvTransfer{ + file_kind, + file_id, + NGC_FT1::Group::Peer::RecvTransfer::State::INITED, + file_size, + 0u, + }; } else { // TODO deny? fprintf(stderr, "FT: rejected init\n"); @@ -611,6 +668,76 @@ static void _handle_FT1_INIT_ACK( transfer.time_since_activity = 0.f; } +static void _handle_FT1_DATA( + Tox* tox, + NGC_EXT_CTX* ngc_ext_ctx, + + uint32_t group_number, + uint32_t peer_number, + + const uint8_t *data, size_t length +) { + size_t curser = 0; + + // - 1 byte (transfer_id) + uint8_t transfer_id; + _DATA_HAVE(sizeof(transfer_id), fprintf(stderr, "FT: packet too small, missing transfer_id\n"); return) + transfer_id = data[curser++]; + + // - 2 bytes (sequence_id) + uint16_t sequence_id; + _DATA_HAVE(sizeof(sequence_id), fprintf(stderr, "FT: packet too small, missing sequence_id\n"); return) + sequence_id = data[curser++]; + sequence_id |= data[curser++] << (1*8); + + if (curser == length) { + fprintf(stderr, "FT: data of size 0!\n"); + return; + } + + auto& groups = ngc_ext_ctx->ngc_ft1_ctx->groups; + if (!groups.count(group_number)) { + fprintf(stderr, "FT: data for unknown group\n"); + return; + } + + NGC_FT1::Group::Peer& peer = groups[group_number].peers[peer_number]; + if (!peer.recv_transfers[transfer_id].has_value()) { + fprintf(stderr, "FT: data for unknown transfer\n"); + return; + } + + auto& transfer = peer.recv_transfers[transfer_id].value(); + + // do reassembly, ignore dups + transfer.rsb.add(sequence_id, std::vector(data+curser, data+curser+(length-curser))); + + NGC_FT1_recv_data_cb* fn_ptr = nullptr; + if (ngc_ext_ctx->ngc_ft1_ctx->cb_recv_data.count(transfer.file_kind)) { + fn_ptr = ngc_ext_ctx->ngc_ft1_ctx->cb_recv_data.at(transfer.file_kind); + } + + if (!fn_ptr) { + fprintf(stderr, "FT: missing cb for recv_data\n"); + return; + } + + // loop for chunks without holes + while (transfer.rsb.canPop()) { + auto data = transfer.rsb.pop(); + + fn_ptr(tox, ngc_ext_ctx, group_number, peer_number, transfer_id, transfer.file_size_current, data.data(), data.size()); + + transfer.file_size_current += data.size(); + } + + // send acks + std::vector ack_seq_ids(transfer.rsb.ack_seq_ids.cbegin(), transfer.rsb.ack_seq_ids.cend()); + if (!ack_seq_ids.empty()) { + _send_pkg_FT1_DATA_ACK(tox, group_number, peer_number, transfer_id, ack_seq_ids.data(), ack_seq_ids.size()); + } +} + static void _handle_FT1_DATA_ACK( Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, @@ -648,19 +775,24 @@ static void _handle_FT1_DATA_ACK( return; } - if ((length - curser) / sizeof(uint16_t)) { + _DATA_HAVE(sizeof(uint16_t), fprintf(stderr, "FT: packet too small, atleast 1 seq_id\n"); return) + + if ((length - curser) % sizeof(uint16_t) != 0) { fprintf(stderr, "FT: data_ack with misaligned data\n"); return; } while (curser < length) { - //pkg.push_back(seq_ids[i] & 0xff); - //pkg.push_back((seq_ids[i] >> (1*8)) & 0xff); uint16_t seq_id = data[curser++]; - seq_id = seq_id | (data[curser++] << (1*8)); + seq_id |= data[curser++] << (1*8); transfer.ssb.erase(seq_id); } + + if (transfer.file_size == transfer.file_size_current) { + fprintf(stderr, "FT: %d done\n", transfer_id); + peer.send_transfers[transfer_id] = std::nullopt; + } } #undef _DATA_HAVE