From 0df0760c064d67fcce42912154cd926d51bd053f Mon Sep 17 00:00:00 2001 From: Green Sky Date: Thu, 11 Jan 2024 00:48:57 +0100 Subject: [PATCH] failing to send is now also a congestion event (hacky and only the first time we send data) --- solanaceae/ngc_ft1/flow_only.cpp | 62 ++++++++++++++++++------------ solanaceae/ngc_ft1/flow_only.hpp | 3 ++ solanaceae/ngc_ft1/ngcft1.cpp | 65 +++++++++++++++++--------------- 3 files changed, 76 insertions(+), 54 deletions(-) diff --git a/solanaceae/ngc_ft1/flow_only.cpp b/solanaceae/ngc_ft1/flow_only.cpp index 2a1499c..65b58a3 100644 --- a/solanaceae/ngc_ft1/flow_only.cpp +++ b/solanaceae/ngc_ft1/flow_only.cpp @@ -28,6 +28,34 @@ void FlowOnly::updateWindow(void) { _fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE); } +void FlowOnly::updateCongestion(void) { + const auto tmp_window = getWindow(); + // packet window * 0.3 + // but atleast 4 + int32_t max_consecutive_events = std::clamp( + (tmp_window/MAXIMUM_SEGMENT_DATA_SIZE) * 0.3f, + 4, + 50 // limit TODO: fix idle/time starved algo + ); + // TODO: magic number + +#if 0 + std::cout << "NGC_FT1 Flow: pkg out of order" + << " w:" << tmp_window + << " pw:" << tmp_window/MAXIMUM_SEGMENT_DATA_SIZE + << " coe:" << _consecutive_events + << " mcoe:" << max_consecutive_events + << "\n"; +#endif + + if (_consecutive_events > max_consecutive_events) { + //std::cout << "CONGESTION! NGC_FT1 flow: pkg out of order\n"; + onCongestion(); + + // TODO: set _consecutive_events to zero? + } +} + float FlowOnly::getWindow(void) { updateWindow(); return _fwnd; @@ -111,29 +139,7 @@ void FlowOnly::onAck(std::vector seqs) { _consecutive_events++; it->ignore = true; // only handle once - const auto tmp_window = getWindow(); - // packet window * 0.3 - // but atleast 4 - int32_t max_consecutive_events = std::clamp( - (tmp_window/MAXIMUM_SEGMENT_DATA_SIZE) * 0.3f, - 4, - 50 // limit TODO: fix idle/time starved algo - ); - // TODO: magic number - -#if 0 - std::cout << "NGC_FT1 Flow: pkg out of order" - << " w:" << tmp_window - << " pw:" << tmp_window/MAXIMUM_SEGMENT_DATA_SIZE - << " coe:" << _consecutive_events - << " mcoe:" << max_consecutive_events - << "\n"; -#endif - - if (_consecutive_events > max_consecutive_events) { - //std::cout << "CONGESTION! NGC_FT1 flow: pkg out of order\n"; - onCongestion(); - } + updateCongestion(); } else { // only mesure delay, if not a congestion addRTT(now - it->timestamp); @@ -190,6 +196,14 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) { it->ignore = true; } - // no ce, since this is usually after data arrived out-of-order/duplicate + // usually after data arrived out-of-order/duplicate + if (!it->ignore) { + it->ignore = true; // only handle once + //_consecutive_events++; + + //updateCongestion(); + // this is usually a safe indicator for congestion/maxed connection + onCongestion(); + } } diff --git a/solanaceae/ngc_ft1/flow_only.hpp b/solanaceae/ngc_ft1/flow_only.hpp index 4c1e624..20e2886 100644 --- a/solanaceae/ngc_ft1/flow_only.hpp +++ b/solanaceae/ngc_ft1/flow_only.hpp @@ -57,6 +57,9 @@ struct FlowOnly : public CCAI { virtual void onCongestion(void) {}; + // internal logic, calls the onCongestion() event + void updateCongestion(void); + public: // api FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {} diff --git a/solanaceae/ngc_ft1/ngcft1.cpp b/solanaceae/ngc_ft1/ngcft1.cpp index 085e55e..c019958 100644 --- a/solanaceae/ngc_ft1/ngcft1.cpp +++ b/solanaceae/ngc_ft1/ngcft1.cpp @@ -186,6 +186,30 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ } //break; return; + case State::FINISHING: // we still have unacked packets + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) { + sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size()); + peer.cca->onLoss({idx, id}, false); + time_since_activity = 0.f; + timeouts_set.erase({idx, id}); + can_packet_size -= data.size(); + } + }); + if (tf.time_since_activity >= sending_give_up_after) { + // no ack after 30sec, close ft + // TODO: notify app + std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n"; + + // clean up cca + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + peer.cca->onLoss({idx, id}, true); + timeouts_set.erase({idx, id}); + }); + + tf_opt.reset(); + } + break; case State::SENDING: { tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) { @@ -249,42 +273,21 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ ); uint16_t seq_id = tf.ssb.add(std::move(new_data)); - sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); - peer.cca->onSent({idx, seq_id}, chunk_size); - -#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1 - fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id); -#endif + const bool sent = sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); + if (sent) { + peer.cca->onSent({idx, seq_id}, chunk_size); + } else { + std::cerr << "NGCFT1: failed to send packet (queue full?) --------------\n"; + peer.cca->onLoss({idx, seq_id}, false); // HACK: fake congestion event + // TODO: onCongestion + can_packet_size = 0; + } tf.file_size_current += chunk_size; can_packet_size -= chunk_size; } } break; - case State::FINISHING: // we still have unacked packets - tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { - if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) { - sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size()); - peer.cca->onLoss({idx, id}, false); - time_since_activity = 0.f; - timeouts_set.erase({idx, id}); - can_packet_size -= data.size(); - } - }); - if (tf.time_since_activity >= sending_give_up_after) { - // no ack after 30sec, close ft - // TODO: notify app - std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n"; - - // clean up cca - tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { - peer.cca->onLoss({idx, id}, true); - timeouts_set.erase({idx, id}); - }); - - tf_opt.reset(); - } - break; default: // invalid state, delete std::cerr << "NGCFT1 error: ft in invalid state, deleting\n"; tf_opt.reset(); @@ -362,6 +365,8 @@ float NGCFT1::iterate(float time_delta) { } if (transfer_in_progress) { + // ~15ms for up to 1mb/s + // ~5ms for up to 4mb/s return 0.005f; // 5ms } else { return 1.f; // once a sec might be too little