Compare commits
6 Commits
57575330dd
...
broken_som
Author | SHA1 | Date | |
---|---|---|---|
ae3dc74933 | |||
0eb30246a8 | |||
c52ac19285 | |||
1231e792a7 | |||
319e754aff | |||
a4201f4407 |
@ -64,6 +64,8 @@ struct CCAI {
|
||||
// get the list of timed out seq_ids
|
||||
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
|
||||
|
||||
virtual int64_t inFlightCount(void) const { return -1; }
|
||||
|
||||
public: // callbacks
|
||||
// data size is without overhead
|
||||
virtual void onSent(SeqIDType seq, size_t data_size) = 0;
|
||||
|
@ -3,14 +3,25 @@
|
||||
#include <cmath>
|
||||
#include <iostream>
|
||||
|
||||
void CUBIC::updateReductionTimer(float time_delta) {
|
||||
const auto now {getTimeNow()};
|
||||
|
||||
// only keep updating while the cca interaction is not too long ago
|
||||
if (now - _time_point_last_update <= getCurrentDelay()*4.f) {
|
||||
_time_since_reduction += time_delta;
|
||||
}
|
||||
}
|
||||
|
||||
void CUBIC::resetReductionTimer(void) {
|
||||
_time_since_reduction = 0.f;
|
||||
}
|
||||
|
||||
float CUBIC::getCWnD(void) const {
|
||||
const double K = cbrt(
|
||||
(_window_max * (1. - BETA)) / SCALING_CONSTANT
|
||||
);
|
||||
|
||||
const double time_since_reduction = getTimeNow() - _time_point_reduction;
|
||||
|
||||
const double TK = time_since_reduction - K;
|
||||
const double TK = _time_since_reduction - K;
|
||||
|
||||
const double cwnd =
|
||||
SCALING_CONSTANT
|
||||
@ -34,13 +45,14 @@ float CUBIC::getCWnD(void) const {
|
||||
|
||||
void CUBIC::onCongestion(void) {
|
||||
// 8 is probably too much (800ms for 100ms rtt)
|
||||
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()*4.f) {
|
||||
const auto tmp_old_tp = getTimeNow() - _time_point_reduction;
|
||||
if (_time_since_reduction >= getCurrentDelay()*4.f) {
|
||||
const auto tmp_old_tp = _time_since_reduction;
|
||||
|
||||
const auto current_cwnd = getCWnD(); // TODO: remove, only used by logging?
|
||||
const auto current_wnd = getWindow(); // respects cwnd and fwnd
|
||||
|
||||
_time_point_reduction = getTimeNow();
|
||||
_bytes_leftover = 0;
|
||||
resetReductionTimer();
|
||||
|
||||
if (current_cwnd < _window_max) {
|
||||
// congestion before reaching the inflection point (prev window_max).
|
||||
@ -72,12 +84,14 @@ float CUBIC::getWindow(void) {
|
||||
int64_t CUBIC::canSend(float time_delta) {
|
||||
const auto fspace_pkgs = FlowOnly::canSend(time_delta);
|
||||
|
||||
updateReductionTimer(time_delta);
|
||||
|
||||
if (fspace_pkgs == 0u) {
|
||||
return 0u;
|
||||
}
|
||||
|
||||
const auto window = getCWnD();
|
||||
int64_t cspace_bytes = window - _in_flight_bytes;
|
||||
int64_t cspace_bytes = (window - _in_flight_bytes) + _bytes_leftover;
|
||||
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||
return 0u;
|
||||
}
|
||||
@ -93,6 +107,8 @@ int64_t CUBIC::canSend(float time_delta) {
|
||||
// limit to whole packets
|
||||
int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
|
||||
_bytes_leftover = cspace_bytes - cspace_pkgs;
|
||||
|
||||
return std::min(cspace_pkgs, fspace_pkgs);
|
||||
}
|
||||
|
||||
|
@ -17,9 +17,14 @@ struct CUBIC : public FlowOnly {
|
||||
// window size before last reduciton
|
||||
double _window_max {2.f * MAXIMUM_SEGMENT_SIZE}; // start with mss*2
|
||||
//double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE};
|
||||
double _time_point_reduction {getTimeNow()};
|
||||
|
||||
double _time_since_reduction {12.f}; // warm start
|
||||
int64_t _bytes_leftover {0};
|
||||
|
||||
private:
|
||||
void updateReductionTimer(float time_delta);
|
||||
void resetReductionTimer(void);
|
||||
|
||||
float getCWnD(void) const;
|
||||
|
||||
// moving avg over the last few delay samples
|
||||
|
@ -98,6 +98,10 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||
return list;
|
||||
}
|
||||
|
||||
int64_t FlowOnly::inFlightCount(void) const {
|
||||
return _in_flight.size();
|
||||
}
|
||||
|
||||
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
|
||||
if constexpr (true) {
|
||||
for (const auto& it : _in_flight) {
|
||||
@ -105,15 +109,17 @@ void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
|
||||
}
|
||||
}
|
||||
|
||||
_in_flight.push_back(
|
||||
const auto& new_entry = _in_flight.emplace_back(
|
||||
FlyingBunch{
|
||||
seq,
|
||||
static_cast<float>(getTimeNow()),
|
||||
data_size + SEGMENT_OVERHEAD
|
||||
data_size + SEGMENT_OVERHEAD,
|
||||
false
|
||||
}
|
||||
);
|
||||
_in_flight_bytes += data_size + SEGMENT_OVERHEAD;
|
||||
//_recently_sent_bytes += data_size + SEGMENT_OVERHEAD;
|
||||
_in_flight_bytes += new_entry.bytes;
|
||||
|
||||
_time_point_last_update = getTimeNow();
|
||||
}
|
||||
|
||||
void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
@ -124,6 +130,8 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
|
||||
const auto now {getTimeNow()};
|
||||
|
||||
_time_point_last_update = now;
|
||||
|
||||
// first seq in seqs is the actual value, all extra are for redundency
|
||||
{ // skip in ack is congestion event
|
||||
// 1. look at primary ack of packet
|
||||
|
@ -38,6 +38,9 @@ struct FlowOnly : public CCAI {
|
||||
|
||||
clock::time_point _time_start_offset;
|
||||
|
||||
// used to clamp growth rate in the void
|
||||
double _time_point_last_update {getTimeNow()};
|
||||
|
||||
protected:
|
||||
// make values relative to algo start for readability (and precision)
|
||||
// get timestamp in seconds
|
||||
@ -72,6 +75,8 @@ struct FlowOnly : public CCAI {
|
||||
// get the list of timed out seq_ids
|
||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||
|
||||
int64_t inFlightCount(void) const override;
|
||||
|
||||
public: // callbacks
|
||||
// data size is without overhead
|
||||
void onSent(SeqIDType seq, size_t data_size) override;
|
||||
|
@ -217,7 +217,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
// timeout increases with active transfers (otherwise we could starve them)
|
||||
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
|
||||
// no ack after 30sec, close ft
|
||||
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n";
|
||||
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n";
|
||||
dispatch(
|
||||
NGCFT1_Event::send_done,
|
||||
Events::NGCFT1_send_done{
|
||||
@ -316,7 +316,7 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
|
||||
}
|
||||
}
|
||||
|
||||
// change iterat start position to not starve transfers in the back
|
||||
// change iterate start position to not starve transfers in the back
|
||||
size_t iterated_count = 0;
|
||||
bool last_send_found = false;
|
||||
for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) {
|
||||
@ -686,7 +686,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
||||
|
||||
// delete if all packets acked
|
||||
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
|
||||
std::cout << "NGCFT1: " << int(e.transfer_id) << " done\n";
|
||||
std::cout << "NGCFT1: " << int(e.transfer_id) << " done. wnd:" << peer.cca->getWindow() << "\n";
|
||||
dispatch(
|
||||
NGCFT1_Event::send_done,
|
||||
Events::NGCFT1_send_done{
|
||||
|
Reference in New Issue
Block a user