From 9ea1fbd725e64e1625a118f3379a5132daaec26a Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 8 Aug 2023 18:13:04 +0200 Subject: [PATCH] ft1 done --- src/ngcft1.cpp | 251 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 233 insertions(+), 18 deletions(-) diff --git a/src/ngcft1.cpp b/src/ngcft1.cpp index 9f29264..5ef1fbd 100644 --- a/src/ngcft1.cpp +++ b/src/ngcft1.cpp @@ -1,9 +1,12 @@ #include "./ngcft1.hpp" +#include + #include #include #include #include +#include bool NGCFT1::sendPKG_FT1_REQUEST( uint32_t group_number, uint32_t peer_number, @@ -128,11 +131,11 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ if (tf.time_since_activity >= init_retry_timeout_after) { if (tf.inits_sent >= 3) { // delete, timed out 3 times - fprintf(stderr, "FT: warning, ft init timed out, deleting\n"); + std::cerr << "NGCFT1 warning: ft init timed out, deleting\n"; tf_opt.reset(); } else { // timed out, resend - fprintf(stderr, "FT: warning, ft init timed out, resending\n"); + std::cerr << "NGCFT1 warning: ft init timed out, resending\n"; sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size()); tf.inits_sent++; tf.time_since_activity = 0.f; @@ -156,7 +159,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ if (tf.time_since_activity >= sending_give_up_after) { // no ack after 30sec, close ft // TODO: notify app - fprintf(stderr, "FT: warning, sending ft in progress timed out, deleting\n"); + std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n"; // clean up cca tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { @@ -247,7 +250,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ if (tf.time_since_activity >= sending_give_up_after) { // no ack after 30sec, close ft // TODO: notify app - fprintf(stderr, "FT: warning, sending ft finishing timed out, deleting\n"); + std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n"; // clean up cca tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { @@ -259,7 +262,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ } break; default: // invalid state, delete - fprintf(stderr, "FT: error, ft in invalid state, deleting\n"); + std::cerr << "NGCFT1 error: ft in invalid state, deleting\n"; tf_opt.reset(); //continue; return; @@ -305,6 +308,8 @@ void NGCFT1::NGC_FT1_send_request_private( uint32_t file_kind, const uint8_t* file_id, size_t file_id_size ) { + // TODO: error check + sendPKG_FT1_REQUEST(group_number, peer_number, file_kind, file_id, file_id_size); } bool NGCFT1::NGC_FT1_send_init_private( @@ -314,31 +319,241 @@ bool NGCFT1::NGC_FT1_send_init_private( size_t file_size, uint8_t* transfer_id ) { - return false; + if (std::get<0>(_t.toxGroupPeerGetConnectionStatus(group_number, peer_number)).value_or(TOX_CONNECTION_NONE) == TOX_CONNECTION_NONE) { + std::cerr << "NGCFT1 error: cant init ft, peer offline\n"; + return false; + } + + auto& peer = groups[group_number].peers[peer_number]; + + // allocate transfer_id + size_t idx = peer.next_send_transfer_idx; + peer.next_send_transfer_idx = (peer.next_send_transfer_idx + 1) % 256; + { // TODO: extract + size_t i = idx; + bool found = false; + do { + if (!peer.send_transfers[i].has_value()) { + // free slot + idx = i; + found = true; + break; + } + + i = (i + 1) % 256; + } while (i != idx); + + if (!found) { + std::cerr << "NGCFT1 error: cant init ft, no free transfer slot\n"; + return false; + } + } + + // TODO: check return value + sendPKG_FT1_INIT(group_number, peer_number, file_kind, file_size, idx, file_id, file_id_size); + + peer.send_transfers[idx] = Group::Peer::SendTransfer{ + file_kind, + std::vector(file_id, file_id+file_id_size), + Group::Peer::SendTransfer::State::INIT_SENT, + 1, + 0.f, + file_size, + 0, + {}, // ssb + }; + + if (transfer_id != nullptr) { + *transfer_id = idx; + } + + return true; } -bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request&) { - std::cout << "NGCFT1: FT1_REQUEST\n"; - return false; +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) { + std::cout << "NGCFT1: FT1_REQUEST fk:" << e.file_kind << " [" << bin2hex(e.file_id) << "]\n"; + + // .... just rethrow?? + // TODO: dont + return dispatch( + NGCFT1_Event::recv_request, + Events::NGCFT1_recv_request{ + e.group_number, e.peer_number, + static_cast(e.file_kind), + e.file_id.data(), e.file_id.size() + } + ); } -bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init&) { - std::cout << "NGCFT1: FT1_INIT\n"; - return false; +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) { + std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << e.transfer_id << " [" << bin2hex(e.file_id) << "]\n"; + + bool accept = false; + dispatch( + NGCFT1_Event::recv_init, + Events::NGCFT1_recv_init{ + e.group_number, e.peer_number, + static_cast(e.file_kind), + e.file_id.data(), e.file_id.size(), + e.transfer_id, + e.file_size, + accept + } + ); + + if (!accept) { + std::cout << "NGCFT1: rejected init\n"; + return true; // return true? + } + + sendPKG_FT1_INIT_ACK(e.group_number, e.peer_number, e.transfer_id); + + std::cout << "NGCFT1: accepted init\n"; + + auto& peer = groups[e.group_number].peers[e.peer_number]; + if (peer.recv_transfers[e.transfer_id].has_value()) { + std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << e.transfer_id << "\n"; + } + + peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{ + e.file_kind, + e.file_id, + Group::Peer::RecvTransfer::State::INITED, + e.file_size, + 0u, + {} // rsb + }; + + return true; } -bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack&) { +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) { std::cout << "NGCFT1: FT1_INIT_ACK\n"; - return false; + + // we now should start sending data + + if (!groups.count(e.group_number)) { + std::cerr << "NGCFT1 warning: init_ack for unknown group\n"; + return true; + } + + Group::Peer& peer = groups[e.group_number].peers[e.peer_number]; + if (!peer.send_transfers[e.transfer_id].has_value()) { + std::cerr << "NGCFT1 warning: init_ack for unknown transfer\n"; + return true; + } + + Group::Peer::SendTransfer& transfer = peer.send_transfers[e.transfer_id].value(); + + using State = Group::Peer::SendTransfer::State; + if (transfer.state != State::INIT_SENT) { + std::cerr << "NGCFT1 error: inti_ack but not in INIT_SENT state\n"; + return true; + } + + // iterate will now call NGC_FT1_send_data_cb + transfer.state = State::SENDING; + transfer.time_since_activity = 0.f; + + return true; } -bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data&) { +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) { std::cout << "NGCFT1: FT1_DATA\n"; - return false; + + if (e.data.empty()) { + std::cerr << "NGCFT1 error: data of size 0!\n"; + return true; + } + + if (!groups.count(e.group_number)) { + std::cerr << "NGCFT1 warning: data for unknown group\n"; + return true; + } + + Group::Peer& peer = groups[e.group_number].peers[e.peer_number]; + if (!peer.recv_transfers[e.transfer_id].has_value()) { + std::cerr << "NGCFT1 warning: data for unknown transfer\n"; + return true; + } + + auto& transfer = peer.recv_transfers[e.transfer_id].value(); + + // do reassembly, ignore dups + transfer.rsb.add(e.sequence_id, std::vector(e.data)); // TODO: ugly explicit copy for what should just be a move + + // loop for chunks without holes + while (transfer.rsb.canPop()) { + auto data = transfer.rsb.pop(); + + // TODO: check return value + dispatch( + NGCFT1_Event::recv_data, + Events::NGCFT1_recv_data{ + e.group_number, e.peer_number, + e.transfer_id, + transfer.file_size_current, + data.data(), data.size() + } + ); + + transfer.file_size_current += data.size(); + } + + // send acks + std::vector ack_seq_ids(transfer.rsb.ack_seq_ids.cbegin(), transfer.rsb.ack_seq_ids.cend()); + // TODO: check if this caps at max acks + if (!ack_seq_ids.empty()) { + // TODO: check return value + sendPKG_FT1_DATA_ACK(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size()); + } + + return true; } -bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack&) { +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) { std::cout << "NGCFT1: FT1_DATA_ACK\n"; - return false; + + if (!groups.count(e.group_number)) { + std::cerr << "NGCFT1 warning: data_ack for unknown group\n"; + return true; + } + + Group::Peer& peer = groups[e.group_number].peers[e.peer_number]; + if (!peer.send_transfers[e.transfer_id].has_value()) { + std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n"; + return true; + } + + Group::Peer::SendTransfer& transfer = peer.send_transfers[e.transfer_id].value(); + + using State = Group::Peer::SendTransfer::State; + if (transfer.state != State::SENDING && transfer.state != State::FINISHING) { + std::cerr << "NGCFT1 error: data_ack but not in SENDING or FINISHING state (" << int(transfer.state) << ")\n"; + return true; + } + + //if ((length - curser) % sizeof(uint16_t) != 0) { + //fprintf(stderr, "FT: data_ack with misaligned data\n"); + //return; + //} + + transfer.time_since_activity = 0.f; + + std::vector seqs; + for (const auto it : e.sequence_ids) { + // TODO: improve this o.o + seqs.push_back({e.transfer_id, it}); + transfer.ssb.erase(it); + } + peer.cca.onAck(seqs); + + // delete if all packets acked + if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) { + std::cout << "NGCFT1: " << e.transfer_id << " done\n"; + peer.send_transfers[e.transfer_id].reset(); + } + + return true; }