diff --git a/ngc_ft1.cpp b/ngc_ft1.cpp index 462ef00..4889d41 100644 --- a/ngc_ft1.cpp +++ b/ngc_ft1.cpp @@ -8,6 +8,40 @@ #include #include +struct SendSequenceBuffer { + struct SSBEntry { + std::vector data; // the data (variable size, but smallen than 500) + float time_since_activity {0.f}; + }; + + // sequence_id -> entry + std::map entries; + + uint16_t next_seq_id {0}; + + void erase(uint16_t seq) { + entries.erase(seq); + } + + // inflight chunks + size_t size(void) const { + return entries.size(); + } + + uint16_t add(std::vector&& data) { + entries[next_seq_id] = {data, 0.f}; + return next_seq_id++; + } + + template + void for_each(float time_delta, FN&& fn) { + for (auto& [id, entry] : entries) { + entry.time_since_activity += time_delta; + fn(id, entry.data, entry.time_since_activity); + } + } +}; + struct NGC_FT1 { NGC_FT1_options options; @@ -61,6 +95,7 @@ struct NGC_FT1 { // sequence array // list of sent but not acked seq_ids + SendSequenceBuffer ssb; }; std::array, 256> send_transfers; size_t next_send_transfer_idx {0}; // next id will be 0 @@ -82,9 +117,9 @@ static bool _send_pkg_FT1_DATA_ACK(const Tox* tox, uint32_t group_number, uint32 // handle pkgs static void _handle_FT1_REQUEST(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); static void _handle_FT1_INIT(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); -//static void _handle_FT1_INIT_ACK(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); -//static void _handle_FT1_DATA(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); -//static void _handle_FT1_DATA_ACK(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); +static void _handle_FT1_INIT_ACK(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); +static void _handle_FT1_DATA(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); +static void _handle_FT1_DATA_ACK(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); //static void _handle_FT1_DATA_FIN(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); //static void _handle_FT1_DATA_FIN_ACK(Tox* tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t *data, size_t length); @@ -94,7 +129,7 @@ bool NGC_FT1_init(NGC_EXT_CTX* ngc_ext_ctx, const struct NGC_FT1_options* option ngc_ext_ctx->callbacks[FT1_REQUEST] = _handle_FT1_REQUEST; ngc_ext_ctx->callbacks[FT1_INIT] = _handle_FT1_INIT; - ngc_ext_ctx->callbacks[FT1_INIT_ACK] = nullptr; + ngc_ext_ctx->callbacks[FT1_INIT_ACK] = _handle_FT1_INIT_ACK; ngc_ext_ctx->callbacks[FT1_DATA] = nullptr; ngc_ext_ctx->callbacks[FT1_DATA_ACK] = nullptr; @@ -138,7 +173,33 @@ void NGC_FT1_iterate(Tox *tox, NGC_EXT_CTX* ngc_ext_ctx/*, void *user_data*/) { } } break; - case State::SENDING: + case State::SENDING: { + tf.ssb.for_each(0.025f, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + // no ack after 10 sec -> resend + if (time_since_activity >= 10.f) { + _send_pkg_FT1_DATA(tox, group_number, peer_number, idx, id, data.data(), data.size()); + time_since_activity = 0.f; + } + }); + + assert(ngc_ext_ctx->ngc_ft1_ctx->cb_send_data.count(tf.file_kind)); + + // if chunks in flight < window size (1 lol) + while (tf.ssb.size() < 1) { + std::vector new_data; + + size_t chunk_size = std::min(10u, tf.file_size - tf.file_size_current); + new_data.resize(chunk_size); + + ngc_ext_ctx->ngc_ft1_ctx->cb_send_data[tf.file_kind](tox, ngc_ext_ctx, group_number, peer_number, idx, tf.file_size_current, new_data.data(), new_data.size()); + uint16_t seq_id = tf.ssb.add(std::move(new_data)); + _send_pkg_FT1_DATA(tox, group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); + + fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id); + + tf.file_size_current += chunk_size; + } + } break; case State::FINISHING: break; @@ -151,58 +212,57 @@ void NGC_FT1_iterate(Tox *tox, NGC_EXT_CTX* ngc_ext_ctx/*, void *user_data*/) { continue; } } - - //struct SendTransfer { - //std::vector file_id; - - //enum class State { - //INIT_SENT, // keep this state until ack or deny or giveup - - //SENDING, // we got the ack and are now sending data - - //// is this real? - //FINISHING, // we sent all data but acks still outstanding???? - - //FINFIN, // we sent the data_fin and are waiting for the data_fin_ack - - //// delete - //} state; - - //// float time_since_last_activity ? - //size_t file_size {0}; - //size_t file_size_current {0}; - - //// sequence array - //// list of sent but not acked seq_ids - //}; - //std::array, 256> send_transfers; } } } } -void NGC_FT1_register_callback_recv_request(NGC_EXT_CTX* ngc_ext_ctx, NGC_FT1_file_kind file_kind, NGC_FT1_recv_request_cb* callback) { +void NGC_FT1_register_callback_recv_request( + NGC_EXT_CTX* ngc_ext_ctx, + NGC_FT1_file_kind file_kind, + NGC_FT1_recv_request_cb* callback +) { assert(ngc_ext_ctx); assert(ngc_ext_ctx->ngc_ft1_ctx); ngc_ext_ctx->ngc_ft1_ctx->cb_recv_request[file_kind] = callback; } -void NGC_FT1_register_callback_recv_init(NGC_EXT_CTX* ngc_ext_ctx, NGC_FT1_file_kind file_kind, NGC_FT1_recv_init_cb* callback) { +void NGC_FT1_register_callback_recv_init( + NGC_EXT_CTX* ngc_ext_ctx, + NGC_FT1_file_kind file_kind, + NGC_FT1_recv_init_cb* callback +) { assert(ngc_ext_ctx); assert(ngc_ext_ctx->ngc_ft1_ctx); ngc_ext_ctx->ngc_ft1_ctx->cb_recv_init[file_kind] = callback; } -void NGC_FT1_register_callback_recv_data(NGC_EXT_CTX* ngc_ext_ctx, NGC_FT1_file_kind file_kind, NGC_FT1_recv_data_cb* callback) { +//void NGC_FT1_register_callback_begin_send( + //NGC_EXT_CTX* ngc_ext_ctx, + //NGC_FT1_file_kind file_kind, + //NGC_FT1_begin_send* callback +//) { + //ngc_ext_ctx->ngc_ft1_ctx->cb_begin_send[file_kind] = callback; +//} + +void NGC_FT1_register_callback_recv_data( + NGC_EXT_CTX* ngc_ext_ctx, + NGC_FT1_file_kind file_kind, + NGC_FT1_recv_data_cb* callback +) { assert(ngc_ext_ctx); assert(ngc_ext_ctx->ngc_ft1_ctx); ngc_ext_ctx->ngc_ft1_ctx->cb_recv_data[file_kind] = callback; } -void NGC_FT1_register_callback_send_data(NGC_EXT_CTX* ngc_ext_ctx, NGC_FT1_file_kind file_kind, NGC_FT1_send_data_cb* callback) { +void NGC_FT1_register_callback_send_data( + NGC_EXT_CTX* ngc_ext_ctx, + NGC_FT1_file_kind file_kind, + NGC_FT1_send_data_cb* callback +) { assert(ngc_ext_ctx); assert(ngc_ext_ctx->ngc_ft1_ctx); @@ -507,5 +567,101 @@ static void _handle_FT1_INIT( } } +static void _handle_FT1_INIT_ACK( + Tox* tox, + NGC_EXT_CTX* ngc_ext_ctx, + + uint32_t group_number, + uint32_t peer_number, + + const uint8_t *data, + size_t length +) { + size_t curser = 0; + + // - 1 byte (transfer_id) + uint8_t transfer_id; + _DATA_HAVE(sizeof(transfer_id), fprintf(stderr, "FT: packet too small, missing transfer_id\n"); return) + transfer_id = data[curser++]; + + // we now should start sending data + + auto& groups = ngc_ext_ctx->ngc_ft1_ctx->groups; + if (!groups.count(group_number)) { + fprintf(stderr, "FT: init_ack for unknown group\n"); + return; + } + + NGC_FT1::Group::Peer& peer = groups[group_number].peers[peer_number]; + if (!peer.send_transfers[transfer_id].has_value()) { + fprintf(stderr, "FT: inti_ack for unknown transfer\n"); + return; + } + + NGC_FT1::Group::Peer::SendTransfer& transfer = peer.send_transfers[transfer_id].value(); + + using State = NGC_FT1::Group::Peer::SendTransfer::State; + if (transfer.state != State::INIT_SENT) { + fprintf(stderr, "FT: inti_ack but not in INIT_SENT state\n"); + return; + } + + // iterate will now call NGC_FT1_send_data_cb + transfer.state = State::SENDING; + transfer.time_since_activity = 0.f; +} + +static void _handle_FT1_DATA_ACK( + Tox* tox, + NGC_EXT_CTX* ngc_ext_ctx, + + uint32_t group_number, + uint32_t peer_number, + + const uint8_t *data, + size_t length +) { + size_t curser = 0; + + // - 1 byte (transfer_id) + uint8_t transfer_id; + _DATA_HAVE(sizeof(transfer_id), fprintf(stderr, "FT: packet too small, missing transfer_id\n"); return) + transfer_id = data[curser++]; + + auto& groups = ngc_ext_ctx->ngc_ft1_ctx->groups; + if (!groups.count(group_number)) { + fprintf(stderr, "FT: data_ack for unknown group\n"); + return; + } + + NGC_FT1::Group::Peer& peer = groups[group_number].peers[peer_number]; + if (!peer.send_transfers[transfer_id].has_value()) { + fprintf(stderr, "FT: data_ack for unknown transfer\n"); + return; + } + + NGC_FT1::Group::Peer::SendTransfer& transfer = peer.send_transfers[transfer_id].value(); + + using State = NGC_FT1::Group::Peer::SendTransfer::State; + if (transfer.state != State::SENDING) { + fprintf(stderr, "FT: data_ack but not in SENDING state\n"); + return; + } + + if ((length - curser) / sizeof(uint16_t)) { + fprintf(stderr, "FT: data_ack with misaligned data\n"); + return; + } + + while (curser < length) { + //pkg.push_back(seq_ids[i] & 0xff); + //pkg.push_back((seq_ids[i] >> (1*8)) & 0xff); + uint16_t seq_id = data[curser++]; + seq_id = seq_id | (data[curser++] << (1*8)); + + transfer.ssb.erase(seq_id); + } +} + #undef _DATA_HAVE diff --git a/ngc_ft1.h b/ngc_ft1.h index 8d5268b..e8e7351 100644 --- a/ngc_ft1.h +++ b/ngc_ft1.h @@ -89,13 +89,34 @@ typedef bool NGC_FT1_recv_init_cb(Tox *tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t g void NGC_FT1_register_callback_recv_init(NGC_EXT_CTX* ngc_ext_ctx, NGC_FT1_file_kind file_kind, NGC_FT1_recv_init_cb* callback); +typedef bool NGC_FT1_begin_send(Tox *tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, const uint8_t transfer_id); + // ========== data ========== -typedef void NGC_FT1_recv_data_cb(Tox *tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, const uint8_t* data, size_t data_size); +typedef void NGC_FT1_recv_data_cb( + Tox *tox, + NGC_EXT_CTX* ngc_ext_ctx, + + uint32_t group_number, + uint32_t peer_number, + uint8_t transfer_id, + + size_t data_offset, const uint8_t* data, size_t data_size +); void NGC_FT1_register_callback_recv_data(NGC_EXT_CTX* ngc_ext_ctx, NGC_FT1_file_kind file_kind, NGC_FT1_recv_data_cb* callback); -typedef void NGC_FT1_send_data_cb(Tox *tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, uint8_t** data, size_t* data_size); +// request to fill data_size bytes into data +typedef void NGC_FT1_send_data_cb( + Tox *tox, + NGC_EXT_CTX* ngc_ext_ctx, + + uint32_t group_number, + uint32_t peer_number, + uint8_t transfer_id, + + size_t data_offset, uint8_t* data, size_t data_size +); void NGC_FT1_register_callback_send_data(NGC_EXT_CTX* ngc_ext_ctx, NGC_FT1_file_kind file_kind, NGC_FT1_send_data_cb* callback); diff --git a/ngc_hs1.cpp b/ngc_hs1.cpp index 680974a..6d34e22 100644 --- a/ngc_hs1.cpp +++ b/ngc_hs1.cpp @@ -96,6 +96,16 @@ void _handle_HS1_ft_recv_data( const uint8_t* data, size_t data_size ); +void _handle_HS1_ft_send_data( + Tox *tox, NGC_EXT_CTX* ngc_ext_ctx, + + uint32_t group_number, + uint32_t peer_number, + uint8_t transfer_id, + + size_t data_offset, uint8_t* data, size_t data_size +); + bool NGC_HS1_init(NGC_EXT_CTX* ngc_ext_ctx, const struct NGC_HS1_options* options) { ngc_ext_ctx->ngc_hs1_ctx = new NGC_HS1; ngc_ext_ctx->ngc_hs1_ctx->options = *options; @@ -106,7 +116,7 @@ bool NGC_HS1_init(NGC_EXT_CTX* ngc_ext_ctx, const struct NGC_HS1_options* option NGC_FT1_register_callback_recv_request(ngc_ext_ctx, NGC_FT1_file_kind::NGC_HS1_MESSAGE_BY_ID, _handle_HS1_ft_recv_request); NGC_FT1_register_callback_recv_init(ngc_ext_ctx, NGC_FT1_file_kind::NGC_HS1_MESSAGE_BY_ID, _handle_HS1_ft_recv_init); NGC_FT1_register_callback_recv_data(ngc_ext_ctx, NGC_FT1_file_kind::NGC_HS1_MESSAGE_BY_ID, _handle_HS1_ft_recv_data); - //NGC_FT1_register_callback_send_data(ngc_ext_ctx, NGC_FT1_file_kind::NGC_HS1_MESSAGE_BY_ID, _handle_HS1_ft_init_message); + NGC_FT1_register_callback_send_data(ngc_ext_ctx, NGC_FT1_file_kind::NGC_HS1_MESSAGE_BY_ID, _handle_HS1_ft_send_data); return true; } @@ -144,7 +154,7 @@ static void _iterate_group(Tox *tox, NGC_EXT_CTX* ngc_ext_ctx, uint32_t group_nu if (peer.time_since_last_request_sent > ngc_hs1_ctx->options.query_interval_per_peer) { peer.time_since_last_request_sent = 0.f; - fprintf(stderr, "HS: requesting ids for %X%X%X%X\n", peer_key.data.data()[0], peer_key.data.data()[1], peer_key.data.data()[2], peer_key.data.data()[3]); + //fprintf(stderr, "HS: requesting ids for %X%X%X%X\n", peer_key.data.data()[0], peer_key.data.data()[1], peer_key.data.data()[2], peer_key.data.data()[3]); // TODO: other way around? // ask everyone if they have newer stuff for this peer @@ -430,7 +440,7 @@ bool _handle_HS1_ft_recv_init( const uint8_t transfer_id, const size_t file_size ) { - fprintf(stderr, "HS: -------hs handle ft init\n"); + //fprintf(stderr, "HS: -------hs handle ft init\n"); // peer id and msg id from file id // TODO: replace, remote crash @@ -513,6 +523,17 @@ void _handle_HS1_ft_recv_data( // TODO: data done? } +void _handle_HS1_ft_send_data( + Tox *tox, NGC_EXT_CTX* ngc_ext_ctx, + + uint32_t group_number, + uint32_t peer_number, + uint8_t transfer_id, + + size_t data_offset, uint8_t* data, size_t data_size +) { +} + #define _HS1_HAVE(x, error) if ((length - curser) < (x)) { error; } void _handle_HS1_REQUEST_LAST_IDS( @@ -537,7 +558,7 @@ void _handle_HS1_REQUEST_LAST_IDS( _HS1_HAVE(1, fprintf(stderr, "HS: packet too small, missing count\n"); return) uint8_t last_msg_id_count = data[curser++]; - fprintf(stderr, "HS: got request for last %u ids\n", last_msg_id_count); + //fprintf(stderr, "HS: got request for last %u ids\n", last_msg_id_count); // get group id _GroupKey g_id{};