diff --git a/CMakeLists.txt b/CMakeLists.txt index 9818052..60c0fe6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,8 @@ add_library(solanaceae_ngcft1 ./solanaceae/ngc_ft1/ngcft1.cpp ./solanaceae/ngc_ft1/cca.hpp + ./solanaceae/ngc_ft1/flow_only.hpp + ./solanaceae/ngc_ft1/flow_only.cpp ./solanaceae/ngc_ft1/ledbat.hpp ./solanaceae/ngc_ft1/ledbat.cpp ./solanaceae/ngc_ft1/cubic.hpp diff --git a/solanaceae/ngc_ft1/cca.hpp b/solanaceae/ngc_ft1/cca.hpp index 2f9060c..d89acf8 100644 --- a/solanaceae/ngc_ft1/cca.hpp +++ b/solanaceae/ngc_ft1/cca.hpp @@ -49,7 +49,7 @@ struct CCAI { // TODO: api for how much data we should send // take time since last sent into account // respect max_byterate_allowed - virtual size_t canSend(void) const = 0; + virtual size_t canSend(void) = 0; // get the list of timed out seq_ids virtual std::vector getTimeouts(void) const = 0; diff --git a/solanaceae/ngc_ft1/cubic.cpp b/solanaceae/ngc_ft1/cubic.cpp index b705c5f..77a50f8 100644 --- a/solanaceae/ngc_ft1/cubic.cpp +++ b/solanaceae/ngc_ft1/cubic.cpp @@ -30,7 +30,7 @@ void CUBIC::addRTT(float new_delay) { _rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema; } -size_t CUBIC::canSend(void) const { +size_t CUBIC::canSend(void) { return 0; } diff --git a/solanaceae/ngc_ft1/cubic.hpp b/solanaceae/ngc_ft1/cubic.hpp index 953b8b4..b73489e 100644 --- a/solanaceae/ngc_ft1/cubic.hpp +++ b/solanaceae/ngc_ft1/cubic.hpp @@ -31,8 +31,8 @@ struct CUBIC : public CCAI { // make values relative to algo start for readability (and precision) // get timestamp in seconds - float getTimeNow(void) const { - return std::chrono::duration{clock::now() - _time_start_offset}.count(); + double getTimeNow(void) const { + return std::chrono::duration{clock::now() - _time_start_offset}.count(); } // moving avg over the last few delay samples @@ -47,7 +47,7 @@ struct CUBIC : public CCAI { // TODO: api for how much data we should send // take time since last sent into account // respect max_byterate_allowed - size_t canSend(void) const override; + size_t canSend(void) override; // get the list of timed out seq_ids std::vector getTimeouts(void) const override; diff --git a/solanaceae/ngc_ft1/flow_only.cpp b/solanaceae/ngc_ft1/flow_only.cpp new file mode 100644 index 0000000..4643931 --- /dev/null +++ b/solanaceae/ngc_ft1/flow_only.cpp @@ -0,0 +1,149 @@ +#include "./flow_only.hpp" + +#include +#include +#include +#include + +float FlowOnly::getCurrentDelay(void) const { + return std::min(_rtt_ema, RTT_MAX); +} + +void FlowOnly::addRTT(float new_delay) { + // lerp(new_delay, rtt_ema, 0.1) + _rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema; +} + +void FlowOnly::updateWindow(void) { + const float current_delay {getCurrentDelay()}; + + _fwnd = max_byterate_allowed * current_delay; + //_fwnd *= 1.3f; // try do balance conservative algo a bit, current_delay +} + +size_t FlowOnly::canSend(void) { + if (_in_flight.empty()) { + assert(_in_flight_bytes == 0); + return MAXIMUM_SEGMENT_DATA_SIZE; + } + + updateWindow(); + + const int64_t fspace = _fwnd - _in_flight_bytes; + if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) { + return 0u; + } + + // limit to whole packets + size_t space = std::ceil(fspace / MAXIMUM_SEGMENT_DATA_SIZE) + * MAXIMUM_SEGMENT_DATA_SIZE; + + return space; +} + +std::vector FlowOnly::getTimeouts(void) const { + std::vector list; + + // after 3 rtt delay, we trigger timeout + const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f; + + for (const auto& [seq, time_stamp, size] : _in_flight) { + if (now_adjusted > time_stamp) { + list.push_back(seq); + } + } + + return list; +} + +void FlowOnly::onSent(SeqIDType seq, size_t data_size) { + if constexpr (true) { + for (const auto& it : _in_flight) { + assert(std::get<0>(it) != seq); + } + } + + _in_flight.push_back({seq, getTimeNow(), data_size + SEGMENT_OVERHEAD}); + _in_flight_bytes += data_size + SEGMENT_OVERHEAD; + //_recently_sent_bytes += data_size + SEGMENT_OVERHEAD; +} + +void FlowOnly::onAck(std::vector seqs) { + if (seqs.empty()) { + assert(false && "got empty list of acks???"); + return; + } + + const auto now {getTimeNow()}; + + // first seq in seqs is the actual value, all extra are for redundency + { // skip in ack is congestion event + // 1. look at primary ack of packet + auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool { + return std::get<0>(v) == seq; + }); + if (it != _in_flight.end()) { + if (it != _in_flight.begin()) { + // not next expected seq -> skip detected + // TODO: congestion event + //if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) { + //_recently_lost_data = true; + //_last_congestion_event = getTimeNow(); + //_last_congestion_rtt = getCurrentDelay(); + //} + } else { + // only mesure delay, if not a congestion + addRTT(now - std::get<1>(*it)); + } + } + } + + for (const auto& seq : seqs) { + auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { + return std::get<0>(v) == seq; + }); + + if (it == _in_flight.end()) { + continue; // not found, ignore + } else { + //most_recent = std::max(most_recent, std::get<1>(*it)); + _in_flight_bytes -= std::get<2>(*it); + assert(_in_flight_bytes >= 0); + //_recently_acked_data += std::get<2>(*it); + _in_flight.erase(it); + } + } +} + +void FlowOnly::onLoss(SeqIDType seq, bool discard) { + auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { + assert(!std::isnan(std::get<1>(v))); + return std::get<0>(v) == seq; + }); + + if (it == _in_flight.end()) { + // error + return; // not found, ignore ?? + } + + std::cerr << "FLOW loss\n"; + + // "if data lost is not to be retransmitted" + if (discard) { + _in_flight_bytes -= std::get<2>(*it); + assert(_in_flight_bytes >= 0); + _in_flight.erase(it); + } + // TODO: reset timestamp? + +#if 0 // temporarily disable ce for timeout + // at most once per rtt? + // TODO: use delay at event instead + if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) { + _recently_lost_data = true; + _last_congestion_event = getTimeNow(); + _last_congestion_rtt = getCurrentDelay(); + } +#endif +} + diff --git a/solanaceae/ngc_ft1/flow_only.hpp b/solanaceae/ngc_ft1/flow_only.hpp new file mode 100644 index 0000000..51e72b4 --- /dev/null +++ b/solanaceae/ngc_ft1/flow_only.hpp @@ -0,0 +1,70 @@ +#pragma once + +#include "./cca.hpp" + +#include +#include +#include + +struct FlowOnly : public CCAI { + using clock = std::chrono::steady_clock; + + public: // config + static constexpr float RTT_EMA_ALPHA = 0.1f; // might need over time + static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much + + //float max_byterate_allowed {1.f*1024*1024}; // 1MiB/s + //float max_byterate_allowed {0.6f*1024*1024}; // 600MiB/s + float max_byterate_allowed {0.5f*1024*1024}; // 500MiB/s + //float max_byterate_allowed {0.05f*1024*1024}; // 50KiB/s + //float max_byterate_allowed {0.15f*1024*1024}; // 150KiB/s + + private: + // initialize to low value, will get corrected very fast + float _fwnd {0.01f * max_byterate_allowed}; // in bytes + + // rtt exponental moving average + float _rtt_ema {0.1f}; + + // list of sequence ids and timestamps of when they where sent (and payload size) + std::vector> _in_flight; + int64_t _in_flight_bytes {0}; + + clock::time_point _time_start_offset; + + private: + // make values relative to algo start for readability (and precision) + // get timestamp in seconds + double getTimeNow(void) const { + return std::chrono::duration{clock::now() - _time_start_offset}.count(); + } + + // moving avg over the last few delay samples + // VERY sensitive to bundling acks + float getCurrentDelay(void) const; + + void addRTT(float new_delay); + + void updateWindow(void); + + public: // api + FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {} + + // TODO: api for how much data we should send + // take time since last sent into account + // respect max_byterate_allowed + size_t canSend(void) override; + + // get the list of timed out seq_ids + std::vector getTimeouts(void) const override; + + public: // callbacks + // data size is without overhead + void onSent(SeqIDType seq, size_t data_size) override; + + void onAck(std::vector seqs) override; + + // if discard, not resent, not inflight + void onLoss(SeqIDType seq, bool discard) override; +}; + diff --git a/solanaceae/ngc_ft1/ledbat.cpp b/solanaceae/ngc_ft1/ledbat.cpp index 52e6265..2cdbe42 100644 --- a/solanaceae/ngc_ft1/ledbat.cpp +++ b/solanaceae/ngc_ft1/ledbat.cpp @@ -1,5 +1,4 @@ #include "./ledbat.hpp" -#include "solanaceae/ngc_ft1/cca.hpp" #include #include @@ -20,7 +19,7 @@ LEDBAT::LEDBAT(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_siz _time_start_offset = clock::now(); } -size_t LEDBAT::canSend(void) const { +size_t LEDBAT::canSend(void) { if (_in_flight.empty()) { return MAXIMUM_SEGMENT_DATA_SIZE; } diff --git a/solanaceae/ngc_ft1/ledbat.hpp b/solanaceae/ngc_ft1/ledbat.hpp index 6f0def5..c34a86b 100644 --- a/solanaceae/ngc_ft1/ledbat.hpp +++ b/solanaceae/ngc_ft1/ledbat.hpp @@ -61,7 +61,7 @@ struct LEDBAT : public CCAI{ // TODO: api for how much data we should send // take time since last sent into account // respect max_byterate_allowed - size_t canSend(void) const override; + size_t canSend(void) override; // get the list of timed out seq_ids std::vector getTimeouts(void) const override; diff --git a/solanaceae/ngc_ft1/ngcft1.cpp b/solanaceae/ngc_ft1/ngcft1.cpp index 25b194a..bd86bc9 100644 --- a/solanaceae/ngc_ft1/ngcft1.cpp +++ b/solanaceae/ngc_ft1/ngcft1.cpp @@ -143,7 +143,7 @@ bool NGCFT1::sendPKG_FT1_MESSAGE( return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK; } -void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set) { +void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set) { auto& tf_opt = peer.send_transfers.at(idx); assert(tf_opt.has_value()); auto& tf = tf_opt.value(); @@ -309,7 +309,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) { auto timeouts = peer.cca->getTimeouts(); - std::set timeouts_set{timeouts.cbegin(), timeouts.cend()}; + std::set timeouts_set{timeouts.cbegin(), timeouts.cend()}; for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) { if (peer.send_transfers.at(idx).has_value()) { @@ -617,7 +617,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) { transfer.time_since_activity = 0.f; - std::vector seqs; + std::vector seqs; for (const auto it : e.sequence_ids) { // TODO: improve this o.o seqs.push_back({e.transfer_id, it}); @@ -712,7 +712,7 @@ bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { } // reset cca - peer.cca = std::make_unique(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 + peer.cca = std::make_unique(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 return false; } diff --git a/solanaceae/ngc_ft1/ngcft1.hpp b/solanaceae/ngc_ft1/ngcft1.hpp index 160f632..b8560a8 100644 --- a/solanaceae/ngc_ft1/ngcft1.hpp +++ b/solanaceae/ngc_ft1/ngcft1.hpp @@ -6,6 +6,7 @@ #include #include +#include "./flow_only.hpp" #include "./ledbat.hpp" #include "./rcv_buf.hpp" @@ -139,7 +140,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider struct Group { struct Peer { - std::unique_ptr cca = std::make_unique(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 + std::unique_ptr cca = std::make_unique(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 struct RecvTransfer { uint32_t file_kind; @@ -199,7 +200,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size); bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size); - void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set); + void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set); void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer); public: