4 Commits

Author SHA1 Message Date
Green Sky
3d4e286a7c refactor sending transfers
always do timeouts and resending first
then roundrobin over sending new data
2025-12-13 16:54:37 +01:00
Green Sky
b7046bcb47 ignore resent in reorder detection 2025-12-13 15:37:01 +01:00
Green Sky
35a82cd67f make onLoss return if found 2025-12-13 13:36:51 +01:00
Green Sky
308790dc3a additional time slicing for flow 2025-12-13 13:33:42 +01:00
9 changed files with 178 additions and 160 deletions

View File

@@ -69,7 +69,8 @@ struct CCAI {
virtual void onAck(std::vector<SeqIDType> seqs) = 0; virtual void onAck(std::vector<SeqIDType> seqs) = 0;
// if discard, not resent, not inflight // if discard, not resent, not inflight
virtual void onLoss(SeqIDType seq, bool discard) = 0; // return if found
virtual bool onLoss(SeqIDType seq, bool discard) = 0;
// signal congestion externally (eg. send queue is full) // signal congestion externally (eg. send queue is full)
virtual void onCongestion(void) {}; virtual void onCongestion(void) {};

View File

@@ -102,6 +102,8 @@ int64_t CUBIC::canSend(float time_delta) {
// this is mostly to prevent spikes on empty windows // this is mostly to prevent spikes on empty windows
const auto rate = window / getCurrentDelay(); const auto rate = window / getCurrentDelay();
// TODO: time slicing alla flow
// we dont want this limit to fall below atleast 1 segment // we dont want this limit to fall below atleast 1 segment
const int64_t max_bytes_per_tick = std::max<int64_t>(rate * time_delta + 0.5f, MAXIMUM_SEGMENT_SIZE); const int64_t max_bytes_per_tick = std::max<int64_t>(rate * time_delta + 0.5f, MAXIMUM_SEGMENT_SIZE);
cspace_bytes = std::min<int64_t>(cspace_bytes, max_bytes_per_tick); cspace_bytes = std::min<int64_t>(cspace_bytes, max_bytes_per_tick);

View File

@@ -80,7 +80,15 @@ int64_t FlowOnly::canSend(float time_delta) {
// also limit to max sendrate per tick, which is usually smaller than window // also limit to max sendrate per tick, which is usually smaller than window
// this is mostly to prevent spikes on empty windows // this is mostly to prevent spikes on empty windows
fspace = std::min<int64_t>(fspace, max_byterate_allowed * time_delta + 0.5f); fspace = std::min<int64_t>({
fspace,
// slice window into time time_delta sized chunks and only allow 1.5 chunks sized per tick
int64_t((1.5f * _fwnd) / time_delta + 0.5f),
// similar, but without current delay in the equation (fallback)
int64_t(1.2f * max_byterate_allowed * time_delta + 0.5f),
});
// limit to whole packets // limit to whole packets
return (fspace / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; return (fspace / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
@@ -162,7 +170,7 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
// find first non ignore, it should be the expected // find first non ignore, it should be the expected
auto first_it = std::find_if_not(_in_flight.cbegin(), _in_flight.cend(), [](const auto& v) -> bool { return v.ignore; }); auto first_it = std::find_if_not(_in_flight.cbegin(), _in_flight.cend(), [](const auto& v) -> bool { return v.ignore; });
if (first_it != _in_flight.cend() && it != first_it) { if (first_it != _in_flight.cend() && it != first_it && !it->ignore) {
// not next expected seq -> skip detected // not next expected seq -> skip detected
_consecutive_events++; _consecutive_events++;
@@ -203,15 +211,18 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
} }
} }
void FlowOnly::onLoss(SeqIDType seq, bool discard) { bool FlowOnly::onLoss(SeqIDType seq, bool discard) {
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
assert(!std::isnan(v.timestamp)); assert(!std::isnan(v.timestamp));
return v.id == seq; return v.id == seq;
}); });
// we care about it still being there, when we do not discard
if (it == _in_flight.end()) { if (it == _in_flight.end()) {
// error if (!discard) {
return; // not found, ignore ?? std::cerr << "FLOW seq not found!\n";
}
return false; // not found, ignore ??
} }
//std::cerr << "FLOW loss\n"; //std::cerr << "FLOW loss\n";
@@ -241,5 +252,7 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) {
// this is usually a safe indicator for congestion/maxed connection // this is usually a safe indicator for congestion/maxed connection
onCongestion(); onCongestion();
} }
return true;
} }

View File

@@ -86,6 +86,6 @@ struct FlowOnly : public CCAI {
void onAck(std::vector<SeqIDType> seqs) override; void onAck(std::vector<SeqIDType> seqs) override;
// if discard, not resent, not inflight // if discard, not resent, not inflight
void onLoss(SeqIDType seq, bool discard) override; bool onLoss(SeqIDType seq, bool discard) override;
}; };

View File

@@ -131,7 +131,7 @@ void LEDBAT::onAck(std::vector<SeqIDType> seqs) {
updateWindows(); updateWindows();
} }
void LEDBAT::onLoss(SeqIDType seq, bool discard) { bool LEDBAT::onLoss(SeqIDType seq, bool discard) {
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
assert(!std::isnan(std::get<1>(v))); assert(!std::isnan(std::get<1>(v)));
return std::get<0>(v) == seq; return std::get<0>(v) == seq;
@@ -139,7 +139,7 @@ void LEDBAT::onLoss(SeqIDType seq, bool discard) {
if (it == _in_flight.end()) { if (it == _in_flight.end()) {
// error // error
return; // not found, ignore ?? return false; // not found, ignore ??
} }
if (PLOTTING) { if (PLOTTING) {
@@ -165,6 +165,8 @@ void LEDBAT::onLoss(SeqIDType seq, bool discard) {
#endif #endif
updateWindows(); updateWindows();
return true;
} }
float LEDBAT::getCurrentDelay(void) const { float LEDBAT::getCurrentDelay(void) const {

View File

@@ -72,7 +72,7 @@ struct LEDBAT : public CCAI {
void onAck(std::vector<SeqIDType> seqs) override; void onAck(std::vector<SeqIDType> seqs) override;
// if discard, not resent, not inflight // if discard, not resent, not inflight
void onLoss(SeqIDType seq, bool discard) override; bool onLoss(SeqIDType seq, bool discard) override;
private: private:
using clock = std::chrono::steady_clock; using clock = std::chrono::steady_clock;

View File

@@ -15,20 +15,19 @@
#include <cassert> #include <cassert>
#include <vector> #include <vector>
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) { void NGCFT1::updateSendTransferPhase1(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
using State = Group::Peer::SendTransfer::State;
auto& tf_opt = peer.send_transfers.at(idx); auto& tf_opt = peer.send_transfers.at(idx);
assert(tf_opt.has_value()); assert(tf_opt.has_value());
auto& tf = tf_opt.value(); auto& tf = tf_opt.value();
tf.time_since_activity += time_delta; tf.time_since_activity += time_delta;
switch (tf.state) { if (tf.state == State::INIT_SENT) {
using State = Group::Peer::SendTransfer::State;
case State::INIT_SENT:
if (tf.time_since_activity >= init_retry_timeout_after) { if (tf.time_since_activity >= init_retry_timeout_after) {
if (tf.inits_sent >= 3) { if (tf.inits_sent >= 3) {
// delete, timed out 3 times // delete, timed out 3 times
std::cerr << "NGCFT1 warning: ft init timed out, deleting\n"; std::cerr << "NGCFT1 warning: sending ft init timed out, deleting\n";
dispatch( dispatch(
NGCFT1_Event::send_done, NGCFT1_Event::send_done,
Events::NGCFT1_send_done{ Events::NGCFT1_send_done{
@@ -39,55 +38,22 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
tf_opt.reset(); tf_opt.reset();
} else { } else {
// timed out, resend // timed out, resend
std::cerr << "NGCFT1 warning: ft init timed out, resending\n"; std::cerr << "NGCFT1 warning: sending ft init timed out, resending\n";
_neep.send_ft1_init(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size()); _neep.send_ft1_init(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
tf.inits_sent++; tf.inits_sent++;
tf.time_since_activity = 0.f; tf.time_since_activity = 0.f;
} }
} }
break; return;
case State::FINISHING: // we still have unacked packets } else if (tf.state == State::FINISHING || tf.state == State::SENDING) {
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (timeouts_set.count({idx, id})) {
if (can_packet_size >= int64_t(data.size())) {
_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
} else {
#if 0 // too spammy
std::cerr << "NGCFT1 warning: no space to resend timedout\n";
#endif
}
}
});
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
// no ack after 30sec, close ft
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
dispatch(
NGCFT1_Event::send_done,
Events::NGCFT1_send_done{
group_number, peer_number,
static_cast<uint8_t>(idx),
}
);
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca->onLoss({idx, id}, true);
timeouts_set.erase({idx, id});
});
tf_opt.reset();
}
break;
case State::SENDING: {
// first handle overall timeout (could otherwise do resends directly before, which is useless)
// timeout increases with active transfers (otherwise we could starve them) // timeout increases with active transfers (otherwise we could starve them)
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) { if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
// no ack after 30sec, close ft // no ack after Xsec, close ft
if (tf.state == State::FINISHING) {
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
} else {
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n"; std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n";
}
dispatch( dispatch(
NGCFT1_Event::send_done, NGCFT1_Event::send_done,
Events::NGCFT1_send_done{ Events::NGCFT1_send_done{
@@ -99,25 +65,60 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
// clean up cca // clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) { tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca->onLoss({idx, id}, true); peer.cca->onLoss({idx, id}, true);
timeouts_set.erase({idx, id});
}); });
tf_opt.reset(); tf_opt.reset();
//continue; // dangerous control flow }
return; return;
} }
// do resends // do send buffer and resending
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) { tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= int64_t(data.size()) && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) { time_since_activity += time_delta;
// TODO: can fail
_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size()); if (tf.state != State::FINISHING && tf.state != State::SENDING) {
peer.cca->onLoss({idx, id}, false); return;
}
if (
time_since_activity >= peer.cca->getCurrentDelay() && // TODO: use OR instead?
timeouts_set.count({idx, id})
) {
if (can_packet_size >= int64_t(data.size() /*+ peer.cca->SEGMENT_OVERHEAD*/)) {
if (_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size())) {
if (!peer.cca->onLoss({idx, id}, false)) { // might not be in cca
peer.cca->onSent({idx, id}, data.size());
}
time_since_activity = 0.f; time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size(); can_packet_size -= data.size();
} else {
std::cerr << "NGCFT1 warning: failed to re-send packet (send queue full?)\n";
// signal ce (we did not call onLoss()
peer.cca->onCongestion();
can_packet_size = 0;
}
#if 0
} else {
std::cerr << "NGCFT1 warning: no space to resend timed-out\n";
#endif
}
timeouts_set.erase({idx, id});
} }
}); });
}
void NGCFT1::updateSendTransferPhase2(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, int64_t& can_packet_size) {
using State = Group::Peer::SendTransfer::State;
auto& tf_opt = peer.send_transfers.at(idx);
assert(tf_opt.has_value());
auto& tf = tf_opt.value();
if (tf.state != State::SENDING) {
return;
}
// if chunks in flight < window size (2) // if chunks in flight < window size (2)
while (can_packet_size > 0 && tf.file_size > 0) { while (can_packet_size > 0 && tf.file_size > 0) {
@@ -152,7 +153,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
if (sent) { if (sent) {
peer.cca->onSent({idx, seq_id}, chunk_size); peer.cca->onSent({idx, seq_id}, chunk_size);
} else { } else {
std::cerr << "NGCFT1: failed to send packet (queue full?) --------------\n"; std::cerr << "NGCFT1 warn: failed to send packet (send queue full?)\n";
peer.cca->onCongestion(); peer.cca->onCongestion();
can_packet_size = 0; can_packet_size = 0;
} }
@@ -160,14 +161,6 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
tf.file_size_current += chunk_size; tf.file_size_current += chunk_size;
can_packet_size -= chunk_size; can_packet_size -= chunk_size;
} }
}
break;
default: // invalid state, delete
std::cerr << "NGCFT1 error: ft in invalid state, deleting\n";
assert(false && "ft in invalid state");
tf_opt.reset();
return;
}
} }
bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) { bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
@@ -201,14 +194,17 @@ bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time) int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time)
// get number current running transfers TODO: improve // resend and get number current running transfers
peer.active_send_transfers = 0; peer.active_send_transfers = 0;
for (const auto& it : peer.send_transfers) { for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
if (it.has_value()) { if (!peer.send_transfers.at(idx).has_value()) {
peer.active_send_transfers++; continue;
} }
peer.active_send_transfers++;
updateSendTransferPhase1(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size);
} }
if (can_packet_size > 0) {
// change iterate start position to not starve transfers in the back // change iterate start position to not starve transfers in the back
size_t iterated_count = 0; size_t iterated_count = 0;
bool last_send_found = false; bool last_send_found = false;
@@ -220,7 +216,8 @@ bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
peer.next_send_transfer_send_idx = idx; peer.next_send_transfer_send_idx = idx;
last_send_found = true; // only set once last_send_found = true; // only set once
} }
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size); updateSendTransferPhase2(time_delta, group_number, peer_number, peer, idx, can_packet_size);
}
} }
} }
} }

View File

@@ -209,7 +209,11 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
std::map<uint32_t, Group> groups; std::map<uint32_t, Group> groups;
protected: protected:
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size); // general update with timeouts and resending
void updateSendTransferPhase1(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
// does sending new data
void updateSendTransferPhase2(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, int64_t& can_packet_size);
bool iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer); bool iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const; const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const;

View File

@@ -26,7 +26,6 @@ struct SendSequenceBuffer {
template<typename FN> template<typename FN>
void for_each(float time_delta, FN&& fn) { void for_each(float time_delta, FN&& fn) {
for (auto& [id, entry] : entries) { for (auto& [id, entry] : entries) {
entry.time_since_activity += time_delta;
fn(id, entry.data, entry.time_since_activity); fn(id, entry.data, entry.time_since_activity);
} }
} }