light cca refator and expose some cca values to the outside

This commit is contained in:
Green Sky 2024-07-12 13:14:24 +02:00
parent 1d97dbe73d
commit 6e681aa3fd
No known key found for this signature in database
9 changed files with 110 additions and 36 deletions

View File

@ -54,7 +54,7 @@ struct CCAI {
virtual float getCurrentDelay(void) const = 0;
// return the current believed window in bytes of how much data can be inflight,
virtual float getWindow(void) = 0;
virtual float getWindow(void) const = 0;
// TODO: api for how much data we should send
// take time since last sent into account
@ -64,8 +64,12 @@ struct CCAI {
// get the list of timed out seq_ids
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
// returns -1 if not implemented, can return 0
virtual int64_t inFlightCount(void) const { return -1; }
// returns -1 if not implemented, can return 0
virtual int64_t inFlightBytes(void) const { return -1; }
public: // callbacks
// data size is without overhead
virtual void onSent(SeqIDType seq, size_t data_size) = 0;

View File

@ -76,7 +76,7 @@ void CUBIC::onCongestion(void) {
}
}
float CUBIC::getWindow(void) {
float CUBIC::getWindow(void) const {
return std::min<float>(getCWnD(), FlowOnly::getWindow());
}

View File

@ -2,13 +2,8 @@
#include "./flow_only.hpp"
#include <chrono>
struct CUBIC : public FlowOnly {
//using clock = std::chrono::steady_clock;
public: // config
//static constexpr float BETA {0.7f};
static constexpr float BETA {0.8f};
static constexpr float SCALING_CONSTANT {0.4f};
static constexpr float RTT_EMA_ALPHA = 0.1f; // 0.1 is very smooth, might need more
@ -26,35 +21,17 @@ struct CUBIC : public FlowOnly {
float getCWnD(void) const;
// moving avg over the last few delay samples
// VERY sensitive to bundling acks
//float getCurrentDelay(void) const;
//void addRTT(float new_delay);
void onCongestion(void) override;
public: // api
CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {}
virtual ~CUBIC(void) {}
float getWindow(void) override;
float getWindow(void) const override;
// TODO: api for how much data we should send
// take time since last sent into account
// respect max_byterate_allowed
int64_t canSend(float time_delta) override;
// get the list of timed out seq_ids
//std::vector<SeqIDType> getTimeouts(void) const override;
public: // callbacks
// data size is without overhead
//void onSent(SeqIDType seq, size_t data_size) override;
//void onAck(std::vector<SeqIDType> seqs) override;
// if discard, not resent, not inflight
//void onLoss(SeqIDType seq, bool discard) override;
};

View File

@ -30,6 +30,7 @@ void FlowOnly::updateWindow(void) {
}
void FlowOnly::updateCongestion(void) {
updateWindow();
const auto tmp_window = getWindow();
// packet window * 0.3
// but atleast 4
@ -57,8 +58,7 @@ void FlowOnly::updateCongestion(void) {
}
}
float FlowOnly::getWindow(void) {
updateWindow();
float FlowOnly::getWindow(void) const {
return _fwnd;
}
@ -102,11 +102,18 @@ int64_t FlowOnly::inFlightCount(void) const {
return _in_flight.size();
}
int64_t FlowOnly::inFlightBytes(void) const {
return _in_flight_bytes;
}
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
if constexpr (true) {
size_t sum {0u};
for (const auto& it : _in_flight) {
assert(it.id != seq);
sum += it.bytes;
}
assert(_in_flight_bytes == sum);
}
const auto& new_entry = _in_flight.emplace_back(

View File

@ -4,7 +4,6 @@
#include <chrono>
#include <vector>
#include <tuple>
struct FlowOnly : public CCAI {
protected:
@ -52,7 +51,8 @@ struct FlowOnly : public CCAI {
// VERY sensitive to bundling acks
float getCurrentDelay(void) const override;
float getWindow(void) override;
// call updateWindow() to update this value
float getWindow(void) const override;
void addRTT(float new_delay);
@ -76,6 +76,7 @@ struct FlowOnly : public CCAI {
std::vector<SeqIDType> getTimeouts(void) const override;
int64_t inFlightCount(void) const override;
int64_t inFlightBytes(void) const override;
public: // callbacks
// data size is without overhead

View File

@ -53,7 +53,7 @@ struct LEDBAT : public CCAI {
// return the current believed window in bytes of how much data can be inflight,
// without overstepping the delay requirement
float getWindow(void) override {
float getWindow(void) const override {
return _cwnd;
}

View File

@ -201,6 +201,29 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
// TODO: receiving tranfers?
}
const CCAI* NGCFT1::getPeerCCA(
uint32_t group_number,
uint32_t peer_number
) const {
auto group_it = groups.find(group_number);
if (group_it == groups.end()) {
return nullptr;;
}
auto peer_it = group_it->second.peers.find(peer_number);
if (peer_it == group_it->second.peers.end()) {
return nullptr;;
}
const auto& cca_ptr = peer_it->second.cca;
if (!cca_ptr) {
return nullptr;;
}
return cca_ptr.get();
}
NGCFT1::NGCFT1(
ToxI& t,
ToxEventProviderI& tep,
@ -337,6 +360,52 @@ bool NGCFT1::NGC_FT1_send_message_public(
return _neep.send_all_ft1_message(group_number, message_id, file_kind, file_id, file_id_size);
}
float NGCFT1::getPeerDelay(uint32_t group_number, uint32_t peer_number) const {
auto* cca_ptr = getPeerCCA(group_number, peer_number);
if (cca_ptr == nullptr) {
return -1.f;
}
return cca_ptr->getCurrentDelay();
}
float NGCFT1::getPeerWindow(uint32_t group_number, uint32_t peer_number) const {
auto* cca_ptr = getPeerCCA(group_number, peer_number);
if (cca_ptr == nullptr) {
return -1.f;
}
return cca_ptr->getWindow();
}
int64_t NGCFT1::getPeerInFlightPackets(
uint32_t group_number,
uint32_t peer_number
) const {
auto* cca_ptr = getPeerCCA(group_number, peer_number);
if (cca_ptr == nullptr) {
return -1;
}
return cca_ptr->inFlightCount();
}
int64_t NGCFT1::getPeerInFlightBytes(
uint32_t group_number,
uint32_t peer_number
) const {
auto* cca_ptr = getPeerCCA(group_number, peer_number);
if (cca_ptr == nullptr) {
return -1;
}
return cca_ptr->inFlightCount();
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
//#if !NDEBUG
std::cout << "NGCFT1: got FT1_REQUEST fk:" << e.file_kind << " [" << bin2hex(e.file_id) << "]\n";

View File

@ -205,6 +205,8 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const;
public:
NGCFT1(
ToxI& t,
@ -215,7 +217,6 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
float iterate(float delta);
public: // ft1 api
// TODO: public variant?
void NGC_FT1_send_request_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
@ -239,6 +240,23 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
const uint8_t* file_id, size_t file_id_size
);
public: // cca stuff
// rtt/delay
// negative on error or no cca
float getPeerDelay(uint32_t group_number, uint32_t peer_number) const;
// belived possible current window
// negative on error or no cca
float getPeerWindow(uint32_t group_number, uint32_t peer_number) const;
// packets in flight
// returns -1 if error or no cca
int64_t getPeerInFlightPackets(uint32_t group_number, uint32_t peer_number) const;
// actual bytes in flight (aka window)
// returns -1 if error or no cca
int64_t getPeerInFlightBytes(uint32_t group_number, uint32_t peer_number) const;
protected:
bool onEvent(const Events::NGCEXT_ft1_request&) override;
bool onEvent(const Events::NGCEXT_ft1_init&) override;

View File

@ -211,6 +211,8 @@ SHA1_NGCFT1::SHA1_NGCFT1(
{
// TODO: also create and destroy
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_construct);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
_nft.subscribe(this, NGCFT1_Event::recv_request);
_nft.subscribe(this, NGCFT1_Event::recv_init);
@ -220,10 +222,6 @@ SHA1_NGCFT1::SHA1_NGCFT1(
_nft.subscribe(this, NGCFT1_Event::send_done);
_nft.subscribe(this, NGCFT1_Event::recv_message);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_construct);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
_rmm.subscribe(this, RegistryMessageModel_Event::send_file_path);
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT);