Compare commits

...

3 Commits

Author SHA1 Message Date
ae3dc74933
something here broke it
- accounting for rounded down bytes
2024-06-04 19:37:33 +02:00
0eb30246a8
small refactor and print in flight packages when timing out 2024-05-31 17:03:22 +02:00
c52ac19285
print window on done 2024-05-31 15:36:18 +02:00
6 changed files with 20 additions and 8 deletions

View File

@ -64,6 +64,8 @@ struct CCAI {
// get the list of timed out seq_ids // get the list of timed out seq_ids
virtual std::vector<SeqIDType> getTimeouts(void) const = 0; virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
virtual int64_t inFlightCount(void) const { return -1; }
public: // callbacks public: // callbacks
// data size is without overhead // data size is without overhead
virtual void onSent(SeqIDType seq, size_t data_size) = 0; virtual void onSent(SeqIDType seq, size_t data_size) = 0;

View File

@ -51,6 +51,7 @@ void CUBIC::onCongestion(void) {
const auto current_cwnd = getCWnD(); // TODO: remove, only used by logging? const auto current_cwnd = getCWnD(); // TODO: remove, only used by logging?
const auto current_wnd = getWindow(); // respects cwnd and fwnd const auto current_wnd = getWindow(); // respects cwnd and fwnd
_bytes_leftover = 0;
resetReductionTimer(); resetReductionTimer();
if (current_cwnd < _window_max) { if (current_cwnd < _window_max) {
@ -90,7 +91,7 @@ int64_t CUBIC::canSend(float time_delta) {
} }
const auto window = getCWnD(); const auto window = getCWnD();
int64_t cspace_bytes = window - _in_flight_bytes; int64_t cspace_bytes = (window - _in_flight_bytes) + _bytes_leftover;
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) { if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
return 0u; return 0u;
} }
@ -106,6 +107,8 @@ int64_t CUBIC::canSend(float time_delta) {
// limit to whole packets // limit to whole packets
int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
_bytes_leftover = cspace_bytes - cspace_pkgs;
return std::min(cspace_pkgs, fspace_pkgs); return std::min(cspace_pkgs, fspace_pkgs);
} }

View File

@ -19,6 +19,7 @@ struct CUBIC : public FlowOnly {
//double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE}; //double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE};
double _time_since_reduction {12.f}; // warm start double _time_since_reduction {12.f}; // warm start
int64_t _bytes_leftover {0};
private: private:
void updateReductionTimer(float time_delta); void updateReductionTimer(float time_delta);

View File

@ -98,6 +98,10 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
return list; return list;
} }
int64_t FlowOnly::inFlightCount(void) const {
return _in_flight.size();
}
void FlowOnly::onSent(SeqIDType seq, size_t data_size) { void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
if constexpr (true) { if constexpr (true) {
for (const auto& it : _in_flight) { for (const auto& it : _in_flight) {
@ -105,15 +109,15 @@ void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
} }
} }
_in_flight.push_back( const auto& new_entry = _in_flight.emplace_back(
FlyingBunch{ FlyingBunch{
seq, seq,
static_cast<float>(getTimeNow()), static_cast<float>(getTimeNow()),
data_size + SEGMENT_OVERHEAD data_size + SEGMENT_OVERHEAD,
false
} }
); );
_in_flight_bytes += data_size + SEGMENT_OVERHEAD; _in_flight_bytes += new_entry.bytes;
//_recently_sent_bytes += data_size + SEGMENT_OVERHEAD;
_time_point_last_update = getTimeNow(); _time_point_last_update = getTimeNow();
} }

View File

@ -75,6 +75,8 @@ struct FlowOnly : public CCAI {
// get the list of timed out seq_ids // get the list of timed out seq_ids
std::vector<SeqIDType> getTimeouts(void) const override; std::vector<SeqIDType> getTimeouts(void) const override;
int64_t inFlightCount(void) const override;
public: // callbacks public: // callbacks
// data size is without overhead // data size is without overhead
void onSent(SeqIDType seq, size_t data_size) override; void onSent(SeqIDType seq, size_t data_size) override;

View File

@ -217,7 +217,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
// timeout increases with active transfers (otherwise we could starve them) // timeout increases with active transfers (otherwise we could starve them)
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) { if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
// no ack after 30sec, close ft // no ack after 30sec, close ft
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n"; std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n";
dispatch( dispatch(
NGCFT1_Event::send_done, NGCFT1_Event::send_done,
Events::NGCFT1_send_done{ Events::NGCFT1_send_done{
@ -316,7 +316,7 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
} }
} }
// change iterat start position to not starve transfers in the back // change iterate start position to not starve transfers in the back
size_t iterated_count = 0; size_t iterated_count = 0;
bool last_send_found = false; bool last_send_found = false;
for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) { for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) {
@ -686,7 +686,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
// delete if all packets acked // delete if all packets acked
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) { if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
std::cout << "NGCFT1: " << int(e.transfer_id) << " done\n"; std::cout << "NGCFT1: " << int(e.transfer_id) << " done. wnd:" << peer.cca->getWindow() << "\n";
dispatch( dispatch(
NGCFT1_Event::send_done, NGCFT1_Event::send_done,
Events::NGCFT1_send_done{ Events::NGCFT1_send_done{