failing to send is now also a congestion event (hacky and only the first time we send data)

This commit is contained in:
Green Sky 2024-01-11 00:48:57 +01:00
parent f02b03da7c
commit 0df0760c06
No known key found for this signature in database
3 changed files with 76 additions and 54 deletions

View File

@ -28,6 +28,34 @@ void FlowOnly::updateWindow(void) {
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE); _fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
} }
void FlowOnly::updateCongestion(void) {
const auto tmp_window = getWindow();
// packet window * 0.3
// but atleast 4
int32_t max_consecutive_events = std::clamp<int32_t>(
(tmp_window/MAXIMUM_SEGMENT_DATA_SIZE) * 0.3f,
4,
50 // limit TODO: fix idle/time starved algo
);
// TODO: magic number
#if 0
std::cout << "NGC_FT1 Flow: pkg out of order"
<< " w:" << tmp_window
<< " pw:" << tmp_window/MAXIMUM_SEGMENT_DATA_SIZE
<< " coe:" << _consecutive_events
<< " mcoe:" << max_consecutive_events
<< "\n";
#endif
if (_consecutive_events > max_consecutive_events) {
//std::cout << "CONGESTION! NGC_FT1 flow: pkg out of order\n";
onCongestion();
// TODO: set _consecutive_events to zero?
}
}
float FlowOnly::getWindow(void) { float FlowOnly::getWindow(void) {
updateWindow(); updateWindow();
return _fwnd; return _fwnd;
@ -111,29 +139,7 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
_consecutive_events++; _consecutive_events++;
it->ignore = true; // only handle once it->ignore = true; // only handle once
const auto tmp_window = getWindow(); updateCongestion();
// packet window * 0.3
// but atleast 4
int32_t max_consecutive_events = std::clamp<int32_t>(
(tmp_window/MAXIMUM_SEGMENT_DATA_SIZE) * 0.3f,
4,
50 // limit TODO: fix idle/time starved algo
);
// TODO: magic number
#if 0
std::cout << "NGC_FT1 Flow: pkg out of order"
<< " w:" << tmp_window
<< " pw:" << tmp_window/MAXIMUM_SEGMENT_DATA_SIZE
<< " coe:" << _consecutive_events
<< " mcoe:" << max_consecutive_events
<< "\n";
#endif
if (_consecutive_events > max_consecutive_events) {
//std::cout << "CONGESTION! NGC_FT1 flow: pkg out of order\n";
onCongestion();
}
} else { } else {
// only mesure delay, if not a congestion // only mesure delay, if not a congestion
addRTT(now - it->timestamp); addRTT(now - it->timestamp);
@ -190,6 +196,14 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) {
it->ignore = true; it->ignore = true;
} }
// no ce, since this is usually after data arrived out-of-order/duplicate // usually after data arrived out-of-order/duplicate
if (!it->ignore) {
it->ignore = true; // only handle once
//_consecutive_events++;
//updateCongestion();
// this is usually a safe indicator for congestion/maxed connection
onCongestion();
}
} }

View File

@ -57,6 +57,9 @@ struct FlowOnly : public CCAI {
virtual void onCongestion(void) {}; virtual void onCongestion(void) {};
// internal logic, calls the onCongestion() event
void updateCongestion(void);
public: // api public: // api
FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {} FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {}

View File

@ -186,6 +186,30 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
} }
//break; //break;
return; return;
case State::FINISHING: // we still have unacked packets
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
}
});
if (tf.time_since_activity >= sending_give_up_after) {
// no ack after 30sec, close ft
// TODO: notify app
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca->onLoss({idx, id}, true);
timeouts_set.erase({idx, id});
});
tf_opt.reset();
}
break;
case State::SENDING: { case State::SENDING: {
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) { tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) { if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
@ -249,42 +273,21 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
); );
uint16_t seq_id = tf.ssb.add(std::move(new_data)); uint16_t seq_id = tf.ssb.add(std::move(new_data));
sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); const bool sent = sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
if (sent) {
peer.cca->onSent({idx, seq_id}, chunk_size); peer.cca->onSent({idx, seq_id}, chunk_size);
} else {
#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1 std::cerr << "NGCFT1: failed to send packet (queue full?) --------------\n";
fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id); peer.cca->onLoss({idx, seq_id}, false); // HACK: fake congestion event
#endif // TODO: onCongestion
can_packet_size = 0;
}
tf.file_size_current += chunk_size; tf.file_size_current += chunk_size;
can_packet_size -= chunk_size; can_packet_size -= chunk_size;
} }
} }
break; break;
case State::FINISHING: // we still have unacked packets
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
}
});
if (tf.time_since_activity >= sending_give_up_after) {
// no ack after 30sec, close ft
// TODO: notify app
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca->onLoss({idx, id}, true);
timeouts_set.erase({idx, id});
});
tf_opt.reset();
}
break;
default: // invalid state, delete default: // invalid state, delete
std::cerr << "NGCFT1 error: ft in invalid state, deleting\n"; std::cerr << "NGCFT1 error: ft in invalid state, deleting\n";
tf_opt.reset(); tf_opt.reset();
@ -362,6 +365,8 @@ float NGCFT1::iterate(float time_delta) {
} }
if (transfer_in_progress) { if (transfer_in_progress) {
// ~15ms for up to 1mb/s
// ~5ms for up to 4mb/s
return 0.005f; // 5ms return 0.005f; // 5ms
} else { } else {
return 1.f; // once a sec might be too little return 1.f; // once a sec might be too little