more fixes
This commit is contained in:
parent
a1a9bf886a
commit
ba8befbb2d
@ -46,10 +46,13 @@ struct CCAI {
|
|||||||
// return the current believed window in bytes of how much data can be inflight,
|
// return the current believed window in bytes of how much data can be inflight,
|
||||||
//virtual float getCWnD(void) const = 0;
|
//virtual float getCWnD(void) const = 0;
|
||||||
|
|
||||||
|
// returns current rtt/delay
|
||||||
|
virtual float getCurrentDelay(void) const = 0;
|
||||||
|
|
||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
virtual size_t canSend(void) = 0;
|
virtual int64_t canSend(void) = 0;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
|
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
|
||||||
|
@ -35,16 +35,23 @@ float CUBIC::getCWnD(void) const {
|
|||||||
void CUBIC::onCongestion(void) {
|
void CUBIC::onCongestion(void) {
|
||||||
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()*4.f) {
|
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()*4.f) {
|
||||||
const auto current_cwnd = getCWnD();
|
const auto current_cwnd = getCWnD();
|
||||||
|
const auto tmp_old_tp = _time_point_reduction;
|
||||||
_time_point_reduction = getTimeNow();
|
_time_point_reduction = getTimeNow();
|
||||||
_window_max = current_cwnd * BETA;
|
_window_max = current_cwnd * BETA;
|
||||||
_window_max = std::max(_window_max, 2.*MAXIMUM_SEGMENT_SIZE);
|
_window_max = std::max(_window_max, 2.*MAXIMUM_SEGMENT_SIZE);
|
||||||
|
|
||||||
//std::cout << "CONGESTION! cwnd:" << current_cwnd << "\n";
|
#if 1
|
||||||
std::cout << "CONGESTION! cwnd_max:" << _window_max << "\n";
|
std::cout << "CONGESTION!"
|
||||||
|
<< " cwnd_max:" << _window_max
|
||||||
|
<< " pts:" << tmp_old_tp
|
||||||
|
<< " rtt:" << getCurrentDelay()
|
||||||
|
<< "\n"
|
||||||
|
;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t CUBIC::canSend(void) {
|
int64_t CUBIC::canSend(void) {
|
||||||
const auto fspace_pkgs = FlowOnly::canSend();
|
const auto fspace_pkgs = FlowOnly::canSend();
|
||||||
|
|
||||||
if (fspace_pkgs == 0u) {
|
if (fspace_pkgs == 0u) {
|
||||||
@ -57,7 +64,7 @@ size_t CUBIC::canSend(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// limit to whole packets
|
// limit to whole packets
|
||||||
size_t cspace_pkgs = std::floor(cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
|
|
||||||
return std::min(cspace_pkgs, fspace_pkgs);
|
return std::min(cspace_pkgs, fspace_pkgs);
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ struct CUBIC : public FlowOnly {
|
|||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
size_t canSend(void) override;
|
int64_t canSend(void) override;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
//std::vector<SeqIDType> getTimeouts(void) const override;
|
//std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
|
@ -28,7 +28,7 @@ void FlowOnly::updateWindow(void) {
|
|||||||
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
|
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t FlowOnly::canSend(void) {
|
int64_t FlowOnly::canSend(void) {
|
||||||
if (_in_flight.empty()) {
|
if (_in_flight.empty()) {
|
||||||
assert(_in_flight_bytes == 0);
|
assert(_in_flight_bytes == 0);
|
||||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
@ -42,10 +42,7 @@ size_t FlowOnly::canSend(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// limit to whole packets
|
// limit to whole packets
|
||||||
size_t space = std::floor(fspace / MAXIMUM_SEGMENT_DATA_SIZE)
|
return (fspace / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
* MAXIMUM_SEGMENT_DATA_SIZE;
|
|
||||||
|
|
||||||
return space;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||||
@ -147,16 +144,10 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
|||||||
assert(_in_flight_bytes >= 0);
|
assert(_in_flight_bytes >= 0);
|
||||||
_in_flight.erase(it);
|
_in_flight.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: reset timestamp?
|
// TODO: reset timestamp?
|
||||||
|
// and not take into rtt
|
||||||
|
|
||||||
#if 0 // temporarily disable ce for timeout
|
// no ce, since this is usually after data arrived out-of-order/duplicate
|
||||||
// at most once per rtt?
|
|
||||||
// TODO: use delay at event instead
|
|
||||||
if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
|
||||||
_recently_lost_data = true;
|
|
||||||
_last_congestion_event = getTimeNow();
|
|
||||||
_last_congestion_rtt = getCurrentDelay();
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ struct FlowOnly : public CCAI {
|
|||||||
|
|
||||||
// moving avg over the last few delay samples
|
// moving avg over the last few delay samples
|
||||||
// VERY sensitive to bundling acks
|
// VERY sensitive to bundling acks
|
||||||
float getCurrentDelay(void) const;
|
float getCurrentDelay(void) const override;
|
||||||
|
|
||||||
void addRTT(float new_delay);
|
void addRTT(float new_delay);
|
||||||
|
|
||||||
@ -59,7 +59,7 @@ struct FlowOnly : public CCAI {
|
|||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
size_t canSend(void) override;
|
int64_t canSend(void) override;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
|
@ -19,7 +19,7 @@ LEDBAT::LEDBAT(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_siz
|
|||||||
_time_start_offset = clock::now();
|
_time_start_offset = clock::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t LEDBAT::canSend(void) {
|
int64_t LEDBAT::canSend(void) {
|
||||||
if (_in_flight.empty()) {
|
if (_in_flight.empty()) {
|
||||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
}
|
}
|
||||||
@ -34,9 +34,7 @@ size_t LEDBAT::canSend(void) {
|
|||||||
return 0u;
|
return 0u;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t space = std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
return std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
|
|
||||||
return space;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<LEDBAT::SeqIDType> LEDBAT::getTimeouts(void) const {
|
std::vector<LEDBAT::SeqIDType> LEDBAT::getTimeouts(void) const {
|
||||||
|
@ -61,7 +61,7 @@ struct LEDBAT : public CCAI{
|
|||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
size_t canSend(void) override;
|
int64_t canSend(void) override;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
@ -86,7 +86,7 @@ struct LEDBAT : public CCAI{
|
|||||||
|
|
||||||
// moving avg over the last few delay samples
|
// moving avg over the last few delay samples
|
||||||
// VERY sensitive to bundling acks
|
// VERY sensitive to bundling acks
|
||||||
float getCurrentDelay(void) const;
|
float getCurrentDelay(void) const override;
|
||||||
|
|
||||||
void addRTT(float new_delay);
|
void addRTT(float new_delay);
|
||||||
|
|
||||||
|
@ -143,7 +143,7 @@ bool NGCFT1::sendPKG_FT1_MESSAGE(
|
|||||||
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
|
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set) {
|
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
|
||||||
auto& tf_opt = peer.send_transfers.at(idx);
|
auto& tf_opt = peer.send_transfers.at(idx);
|
||||||
assert(tf_opt.has_value());
|
assert(tf_opt.has_value());
|
||||||
auto& tf = tf_opt.value();
|
auto& tf = tf_opt.value();
|
||||||
@ -177,14 +177,13 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
return;
|
return;
|
||||||
case State::SENDING: {
|
case State::SENDING: {
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||||
// no ack after 5 sec -> resend
|
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
|
||||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
|
||||||
if (timeouts_set.count({idx, id})) {
|
|
||||||
// TODO: can fail
|
// TODO: can fail
|
||||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||||
peer.cca->onLoss({idx, id}, false);
|
peer.cca->onLoss({idx, id}, false);
|
||||||
time_since_activity = 0.f;
|
time_since_activity = 0.f;
|
||||||
timeouts_set.erase({idx, id});
|
timeouts_set.erase({idx, id});
|
||||||
|
can_packet_size -= data.size();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -211,21 +210,10 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
}
|
}
|
||||||
|
|
||||||
// if chunks in flight < window size (2)
|
// if chunks in flight < window size (2)
|
||||||
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
|
|
||||||
int64_t can_packet_size {static_cast<int64_t>(peer.cca->canSend())};
|
|
||||||
//if (can_packet_size) {
|
|
||||||
//std::cerr << "FT: can_packet_size: " << can_packet_size;
|
|
||||||
//}
|
|
||||||
size_t count {0};
|
|
||||||
while (can_packet_size > 0 && tf.file_size > 0) {
|
while (can_packet_size > 0 && tf.file_size > 0) {
|
||||||
std::vector<uint8_t> new_data;
|
std::vector<uint8_t> new_data;
|
||||||
|
|
||||||
// TODO: parameterize packet size? -> only if JF increases lossy packet size >:)
|
|
||||||
//size_t chunk_size = std::min<size_t>(496u, tf.file_size - tf.file_size_current);
|
|
||||||
//size_t chunk_size = std::min<size_t>(can_packet_size, tf.file_size - tf.file_size_current);
|
|
||||||
size_t chunk_size = std::min<size_t>({
|
size_t chunk_size = std::min<size_t>({
|
||||||
//496u,
|
|
||||||
//996u,
|
|
||||||
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
||||||
static_cast<size_t>(can_packet_size),
|
static_cast<size_t>(can_packet_size),
|
||||||
tf.file_size - tf.file_size_current
|
tf.file_size - tf.file_size_current
|
||||||
@ -237,14 +225,6 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
|
|
||||||
new_data.resize(chunk_size);
|
new_data.resize(chunk_size);
|
||||||
|
|
||||||
//ngc_ft1_ctx->cb_send_data[tf.file_kind](
|
|
||||||
//tox,
|
|
||||||
//group_number, peer_number,
|
|
||||||
//idx,
|
|
||||||
//tf.file_size_current,
|
|
||||||
//new_data.data(), new_data.size(),
|
|
||||||
//ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr
|
|
||||||
//);
|
|
||||||
assert(idx <= 0xffu);
|
assert(idx <= 0xffu);
|
||||||
// TODO: check return value
|
// TODO: check return value
|
||||||
dispatch(
|
dispatch(
|
||||||
@ -267,22 +247,17 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
|
|
||||||
tf.file_size_current += chunk_size;
|
tf.file_size_current += chunk_size;
|
||||||
can_packet_size -= chunk_size;
|
can_packet_size -= chunk_size;
|
||||||
count++;
|
|
||||||
}
|
}
|
||||||
//if (count) {
|
|
||||||
//std::cerr << " split over " << count << "\n";
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case State::FINISHING: // we still have unacked packets
|
case State::FINISHING: // we still have unacked packets
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||||
// no ack after 5 sec -> resend
|
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
|
||||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
|
||||||
if (timeouts_set.count({idx, id})) {
|
|
||||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||||
peer.cca->onLoss({idx, id}, false);
|
peer.cca->onLoss({idx, id}, false);
|
||||||
time_since_activity = 0.f;
|
time_since_activity = 0.f;
|
||||||
timeouts_set.erase({idx, id});
|
timeouts_set.erase({idx, id});
|
||||||
|
can_packet_size -= data.size();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (tf.time_since_activity >= sending_give_up_after) {
|
if (tf.time_since_activity >= sending_give_up_after) {
|
||||||
@ -311,9 +286,11 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
|
|||||||
auto timeouts = peer.cca->getTimeouts();
|
auto timeouts = peer.cca->getTimeouts();
|
||||||
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
||||||
|
|
||||||
|
|
||||||
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
||||||
if (peer.send_transfers.at(idx).has_value()) {
|
if (peer.send_transfers.at(idx).has_value()) {
|
||||||
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set);
|
int64_t can_packet_size {peer.cca->canSend()}; // might get more space while iterating (time)
|
||||||
|
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,7 +201,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
|||||||
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
||||||
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
||||||
|
|
||||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set);
|
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
|
||||||
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
Loading…
Reference in New Issue
Block a user