fix missing virtual destructor and scale tranfer timeout with concurency

This commit is contained in:
Green Sky 2024-03-05 16:48:58 +01:00
parent bccd04316a
commit 5fd1f2ab84
No known key found for this signature in database
6 changed files with 32 additions and 17 deletions

View File

@ -48,13 +48,12 @@ struct CCAI {
public: // api public: // api
CCAI(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {} CCAI(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {}
virtual ~CCAI(void) {}
// return the current believed window in bytes of how much data can be inflight,
//virtual float getCWnD(void) const = 0;
// returns current rtt/delay // returns current rtt/delay
virtual float getCurrentDelay(void) const = 0; virtual float getCurrentDelay(void) const = 0;
// return the current believed window in bytes of how much data can be inflight,
virtual float getWindow(void) = 0; virtual float getWindow(void) = 0;
// TODO: api for how much data we should send // TODO: api for how much data we should send

View File

@ -32,6 +32,7 @@ struct CUBIC : public FlowOnly {
public: // api public: // api
CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {} CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {}
virtual ~CUBIC(void) {}
float getWindow(void) override; float getWindow(void) override;

View File

@ -62,6 +62,7 @@ struct FlowOnly : public CCAI {
public: // api public: // api
FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {} FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {}
virtual ~FlowOnly(void) {}
// TODO: api for how much data we should send // TODO: api for how much data we should send
// take time since last sent into account // take time since last sent into account

View File

@ -49,6 +49,7 @@ struct LEDBAT : public CCAI {
public: public:
LEDBAT(size_t maximum_segment_data_size); LEDBAT(size_t maximum_segment_data_size);
virtual ~LEDBAT(void) {}
// return the current believed window in bytes of how much data can be inflight, // return the current believed window in bytes of how much data can be inflight,
// without overstepping the delay requirement // without overstepping the delay requirement

View File

@ -211,18 +211,9 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
} }
break; break;
case State::SENDING: { case State::SENDING: {
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) { // first handle overall timeout (could otherwise do resends directly before, which is useless)
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) { // timeout increases with active transfers (otherwise we could starve them)
// TODO: can fail if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
}
});
if (tf.time_since_activity >= sending_give_up_after) {
// no ack after 30sec, close ft // no ack after 30sec, close ft
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n"; std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n";
dispatch( dispatch(
@ -244,6 +235,18 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
return; return;
} }
// do resends
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
// TODO: can fail
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
}
});
// if chunks in flight < window size (2) // if chunks in flight < window size (2)
while (can_packet_size > 0 && tf.file_size > 0) { while (can_packet_size > 0 && tf.file_size > 0) {
std::vector<uint8_t> new_data; std::vector<uint8_t> new_data;
@ -303,6 +306,14 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time) int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time)
// get number current running transfers TODO: improve
peer.active_send_transfers = 0;
for (const auto& it : peer.send_transfers) {
if (it.has_value()) {
peer.active_send_transfers++;
}
}
// change iterat start position to not starve transfers in the back // change iterat start position to not starve transfers in the back
size_t iterated_count = 0; size_t iterated_count = 0;
bool last_send_found = false; bool last_send_found = false;

View File

@ -137,8 +137,8 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
// TODO: config // TODO: config
size_t acks_per_packet {3u}; // 3 size_t acks_per_packet {3u}; // 3
float init_retry_timeout_after {5.f}; // 10sec float init_retry_timeout_after {4.f};
float sending_give_up_after {30.f}; // 30sec float sending_give_up_after {15.f}; // 30sec (per active transfer)
struct Group { struct Group {
@ -193,6 +193,8 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
std::array<std::optional<SendTransfer>, 256> send_transfers; std::array<std::optional<SendTransfer>, 256> send_transfers;
size_t next_send_transfer_idx {0}; // next id will be 0 size_t next_send_transfer_idx {0}; // next id will be 0
size_t next_send_transfer_send_idx {0}; size_t next_send_transfer_send_idx {0};
size_t active_send_transfers {0};
}; };
std::map<uint32_t, Peer> peers; std::map<uint32_t, Peer> peers;
}; };