Compare commits
9 Commits
cca_rework
...
27fd9e688b
Author | SHA1 | Date | |
---|---|---|---|
27fd9e688b | |||
f28e79dcbc | |||
7af5fda0a6 | |||
f91780c602 | |||
1e6929c93b | |||
81a353570b | |||
070585ab3d | |||
ba8befbb2d | |||
a1a9bf886a |
@ -46,10 +46,15 @@ struct CCAI {
|
||||
// return the current believed window in bytes of how much data can be inflight,
|
||||
//virtual float getCWnD(void) const = 0;
|
||||
|
||||
// returns current rtt/delay
|
||||
virtual float getCurrentDelay(void) const = 0;
|
||||
|
||||
virtual float getWindow(void) = 0;
|
||||
|
||||
// TODO: api for how much data we should send
|
||||
// take time since last sent into account
|
||||
// respect max_byterate_allowed
|
||||
virtual size_t canSend(void) = 0;
|
||||
virtual int64_t canSend(void) = 0;
|
||||
|
||||
// get the list of timed out seq_ids
|
||||
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
|
||||
|
@ -33,16 +33,35 @@ float CUBIC::getCWnD(void) const {
|
||||
}
|
||||
|
||||
void CUBIC::onCongestion(void) {
|
||||
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()) {
|
||||
const auto current_cwnd = getCWnD();
|
||||
_time_point_reduction = getTimeNow();
|
||||
_window_max = current_cwnd;
|
||||
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()*4.f) {
|
||||
const auto tmp_old_tp = getTimeNow() - _time_point_reduction;
|
||||
|
||||
std::cout << "CONGESTION! cwnd:" << current_cwnd << "\n";
|
||||
const auto current_cwnd = getCWnD();
|
||||
const auto current_wnd = getWindow(); // respects cwnd and fwnd
|
||||
|
||||
_time_point_reduction = getTimeNow();
|
||||
//_window_max = current_cwnd * BETA;
|
||||
_window_max = current_wnd * BETA;
|
||||
_window_max = std::max(_window_max, 2.*MAXIMUM_SEGMENT_SIZE);
|
||||
|
||||
#if 1
|
||||
std::cout << "----CONGESTION!"
|
||||
<< " cwnd:" << current_cwnd
|
||||
<< " wnd:" << current_wnd
|
||||
<< " cwnd_max:" << _window_max
|
||||
<< " pts:" << tmp_old_tp
|
||||
<< " rtt:" << getCurrentDelay()
|
||||
<< "\n"
|
||||
;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
size_t CUBIC::canSend(void) {
|
||||
float CUBIC::getWindow(void) {
|
||||
return std::min<float>(getCWnD(), FlowOnly::getWindow());
|
||||
}
|
||||
|
||||
int64_t CUBIC::canSend(void) {
|
||||
const auto fspace_pkgs = FlowOnly::canSend();
|
||||
|
||||
if (fspace_pkgs == 0u) {
|
||||
@ -55,7 +74,7 @@ size_t CUBIC::canSend(void) {
|
||||
}
|
||||
|
||||
// limit to whole packets
|
||||
size_t cspace_pkgs = std::floor(cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
|
||||
return std::min(cspace_pkgs, fspace_pkgs);
|
||||
}
|
||||
|
@ -8,7 +8,8 @@ struct CUBIC : public FlowOnly {
|
||||
//using clock = std::chrono::steady_clock;
|
||||
|
||||
public: // config
|
||||
static constexpr float BETA {0.7f};
|
||||
//static constexpr float BETA {0.7f};
|
||||
static constexpr float BETA {0.8f};
|
||||
static constexpr float SCALING_CONSTANT {0.4f};
|
||||
static constexpr float RTT_EMA_ALPHA = 0.1f; // 0.1 is very smooth, might need more
|
||||
|
||||
@ -32,10 +33,12 @@ struct CUBIC : public FlowOnly {
|
||||
public: // api
|
||||
CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {}
|
||||
|
||||
float getWindow(void) override;
|
||||
|
||||
// TODO: api for how much data we should send
|
||||
// take time since last sent into account
|
||||
// respect max_byterate_allowed
|
||||
size_t canSend(void) override;
|
||||
int64_t canSend(void) override;
|
||||
|
||||
// get the list of timed out seq_ids
|
||||
//std::vector<SeqIDType> getTimeouts(void) const override;
|
||||
|
@ -10,6 +10,11 @@ float FlowOnly::getCurrentDelay(void) const {
|
||||
}
|
||||
|
||||
void FlowOnly::addRTT(float new_delay) {
|
||||
if (new_delay > _rtt_ema * RTT_UP_MAX) {
|
||||
// too large a jump up, to be taken into account
|
||||
return;
|
||||
}
|
||||
|
||||
// lerp(new_delay, rtt_ema, 0.1)
|
||||
_rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema;
|
||||
}
|
||||
@ -23,7 +28,12 @@ void FlowOnly::updateWindow(void) {
|
||||
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
|
||||
}
|
||||
|
||||
size_t FlowOnly::canSend(void) {
|
||||
float FlowOnly::getWindow(void) {
|
||||
updateWindow();
|
||||
return _fwnd;
|
||||
}
|
||||
|
||||
int64_t FlowOnly::canSend(void) {
|
||||
if (_in_flight.empty()) {
|
||||
assert(_in_flight_bytes == 0);
|
||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
@ -37,10 +47,7 @@ size_t FlowOnly::canSend(void) {
|
||||
}
|
||||
|
||||
// limit to whole packets
|
||||
size_t space = std::floor(fspace / MAXIMUM_SEGMENT_DATA_SIZE)
|
||||
* MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
|
||||
return space;
|
||||
return (fspace / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
}
|
||||
|
||||
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||
@ -49,7 +56,7 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||
// after 3 rtt delay, we trigger timeout
|
||||
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
|
||||
|
||||
for (const auto& [seq, time_stamp, size] : _in_flight) {
|
||||
for (const auto& [seq, time_stamp, size, _] : _in_flight) {
|
||||
if (now_adjusted > time_stamp) {
|
||||
list.push_back(seq);
|
||||
}
|
||||
@ -61,11 +68,17 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
|
||||
if constexpr (true) {
|
||||
for (const auto& it : _in_flight) {
|
||||
assert(std::get<0>(it) != seq);
|
||||
assert(it.id != seq);
|
||||
}
|
||||
}
|
||||
|
||||
_in_flight.push_back({seq, getTimeNow(), data_size + SEGMENT_OVERHEAD});
|
||||
_in_flight.push_back(
|
||||
FlyingBunch{
|
||||
seq,
|
||||
static_cast<float>(getTimeNow()),
|
||||
data_size + SEGMENT_OVERHEAD
|
||||
}
|
||||
);
|
||||
_in_flight_bytes += data_size + SEGMENT_OVERHEAD;
|
||||
//_recently_sent_bytes += data_size + SEGMENT_OVERHEAD;
|
||||
}
|
||||
@ -82,24 +95,47 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
{ // skip in ack is congestion event
|
||||
// 1. look at primary ack of packet
|
||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool {
|
||||
return std::get<0>(v) == seq;
|
||||
return v.id == seq;
|
||||
});
|
||||
if (it != _in_flight.end()) {
|
||||
if (it != _in_flight.begin()) {
|
||||
if (it != _in_flight.end() && !it->ignore) {
|
||||
// find first non ignore, it should be the expected
|
||||
auto first_it = std::find_if_not(_in_flight.cbegin(), _in_flight.cend(), [](const auto& v) -> bool { return v.ignore; });
|
||||
|
||||
if (first_it != _in_flight.cend() && it != first_it) {
|
||||
// not next expected seq -> skip detected
|
||||
|
||||
std::cout << "CONGESTION out of order\n";
|
||||
onCongestion();
|
||||
//if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
||||
//_recently_lost_data = true;
|
||||
//_last_congestion_event = getTimeNow();
|
||||
//_last_congestion_rtt = getCurrentDelay();
|
||||
//}
|
||||
_consecutive_events++;
|
||||
it->ignore = true; // only handle once
|
||||
|
||||
const auto tmp_window = getWindow();
|
||||
// packet window * 0.3
|
||||
// but atleast 4
|
||||
int32_t max_consecutive_events = std::clamp<int32_t>(
|
||||
(tmp_window/MAXIMUM_SEGMENT_DATA_SIZE) * 0.3f,
|
||||
4,
|
||||
50 // limit TODO: fix idle/time starved algo
|
||||
);
|
||||
// TODO: magic number
|
||||
|
||||
#if 0
|
||||
std::cout << "NGC_FT1 Flow: pkg out of order"
|
||||
<< " w:" << tmp_window
|
||||
<< " pw:" << tmp_window/MAXIMUM_SEGMENT_DATA_SIZE
|
||||
<< " coe:" << _consecutive_events
|
||||
<< " mcoe:" << max_consecutive_events
|
||||
<< "\n";
|
||||
#endif
|
||||
|
||||
if (_consecutive_events > max_consecutive_events) {
|
||||
//std::cout << "CONGESTION! NGC_FT1 flow: pkg out of order\n";
|
||||
onCongestion();
|
||||
}
|
||||
} else {
|
||||
// only mesure delay, if not a congestion
|
||||
addRTT(now - std::get<1>(*it));
|
||||
addRTT(now - it->timestamp);
|
||||
_consecutive_events = 0;
|
||||
}
|
||||
} else {
|
||||
} else { // TOOD: if ! ignore too
|
||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
#if 0
|
||||
// assume we got a duplicated packet
|
||||
@ -111,14 +147,14 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
|
||||
for (const auto& seq : seqs) {
|
||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||
return std::get<0>(v) == seq;
|
||||
return v.id == seq;
|
||||
});
|
||||
|
||||
if (it == _in_flight.end()) {
|
||||
continue; // not found, ignore
|
||||
} else {
|
||||
//most_recent = std::max(most_recent, std::get<1>(*it));
|
||||
_in_flight_bytes -= std::get<2>(*it);
|
||||
_in_flight_bytes -= it->bytes;
|
||||
assert(_in_flight_bytes >= 0);
|
||||
//_recently_acked_data += std::get<2>(*it);
|
||||
_in_flight.erase(it);
|
||||
@ -128,8 +164,8 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
|
||||
void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||
assert(!std::isnan(std::get<1>(v)));
|
||||
return std::get<0>(v) == seq;
|
||||
assert(!std::isnan(v.timestamp));
|
||||
return v.id == seq;
|
||||
});
|
||||
|
||||
if (it == _in_flight.end()) {
|
||||
@ -137,24 +173,19 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
||||
return; // not found, ignore ??
|
||||
}
|
||||
|
||||
std::cerr << "FLOW loss\n";
|
||||
//std::cerr << "FLOW loss\n";
|
||||
|
||||
// "if data lost is not to be retransmitted"
|
||||
if (discard) {
|
||||
_in_flight_bytes -= std::get<2>(*it);
|
||||
_in_flight_bytes -= it->bytes;
|
||||
assert(_in_flight_bytes >= 0);
|
||||
_in_flight.erase(it);
|
||||
} else {
|
||||
// and not take into rtt
|
||||
it->timestamp = getTimeNow();
|
||||
it->ignore = true;
|
||||
}
|
||||
// TODO: reset timestamp?
|
||||
|
||||
#if 0 // temporarily disable ce for timeout
|
||||
// at most once per rtt?
|
||||
// TODO: use delay at event instead
|
||||
if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
||||
_recently_lost_data = true;
|
||||
_last_congestion_event = getTimeNow();
|
||||
_last_congestion_rtt = getCurrentDelay();
|
||||
}
|
||||
#endif
|
||||
// no ce, since this is usually after data arrived out-of-order/duplicate
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,8 @@ struct FlowOnly : public CCAI {
|
||||
using clock = std::chrono::steady_clock;
|
||||
|
||||
public: // config
|
||||
static constexpr float RTT_EMA_ALPHA = 0.1f; // might need over time
|
||||
static constexpr float RTT_EMA_ALPHA = 0.001f; // might need change over time
|
||||
static constexpr float RTT_UP_MAX = 3.0f; // how much larger a delay can be to be taken into account
|
||||
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
|
||||
|
||||
//float max_byterate_allowed {100.f*1024*1024}; // 100MiB/s
|
||||
@ -30,9 +31,19 @@ struct FlowOnly : public CCAI {
|
||||
float _rtt_ema {0.1f};
|
||||
|
||||
// list of sequence ids and timestamps of when they where sent (and payload size)
|
||||
std::vector<std::tuple<SeqIDType, float, size_t>> _in_flight;
|
||||
struct FlyingBunch {
|
||||
SeqIDType id;
|
||||
float timestamp;
|
||||
size_t bytes;
|
||||
|
||||
// set to true if counted as ce or resent due to timeout
|
||||
bool ignore {false};
|
||||
};
|
||||
std::vector<FlyingBunch> _in_flight;
|
||||
int64_t _in_flight_bytes {0};
|
||||
|
||||
int32_t _consecutive_events {0};
|
||||
|
||||
clock::time_point _time_start_offset;
|
||||
|
||||
protected:
|
||||
@ -44,7 +55,9 @@ struct FlowOnly : public CCAI {
|
||||
|
||||
// moving avg over the last few delay samples
|
||||
// VERY sensitive to bundling acks
|
||||
float getCurrentDelay(void) const;
|
||||
float getCurrentDelay(void) const override;
|
||||
|
||||
float getWindow(void) override;
|
||||
|
||||
void addRTT(float new_delay);
|
||||
|
||||
@ -58,7 +71,7 @@ struct FlowOnly : public CCAI {
|
||||
// TODO: api for how much data we should send
|
||||
// take time since last sent into account
|
||||
// respect max_byterate_allowed
|
||||
size_t canSend(void) override;
|
||||
int64_t canSend(void) override;
|
||||
|
||||
// get the list of timed out seq_ids
|
||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <deque>
|
||||
#include <cstdint>
|
||||
#include <cassert>
|
||||
#include <tuple>
|
||||
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
@ -19,7 +20,7 @@ LEDBAT::LEDBAT(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_siz
|
||||
_time_start_offset = clock::now();
|
||||
}
|
||||
|
||||
size_t LEDBAT::canSend(void) {
|
||||
int64_t LEDBAT::canSend(void) {
|
||||
if (_in_flight.empty()) {
|
||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
}
|
||||
@ -34,9 +35,7 @@ size_t LEDBAT::canSend(void) {
|
||||
return 0u;
|
||||
}
|
||||
|
||||
size_t space = std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
|
||||
return space;
|
||||
return std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
}
|
||||
|
||||
std::vector<LEDBAT::SeqIDType> LEDBAT::getTimeouts(void) const {
|
||||
|
@ -61,7 +61,7 @@ struct LEDBAT : public CCAI{
|
||||
// TODO: api for how much data we should send
|
||||
// take time since last sent into account
|
||||
// respect max_byterate_allowed
|
||||
size_t canSend(void) override;
|
||||
int64_t canSend(void) override;
|
||||
|
||||
// get the list of timed out seq_ids
|
||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||
@ -86,7 +86,7 @@ struct LEDBAT : public CCAI{
|
||||
|
||||
// moving avg over the last few delay samples
|
||||
// VERY sensitive to bundling acks
|
||||
float getCurrentDelay(void) const;
|
||||
float getCurrentDelay(void) const override;
|
||||
|
||||
void addRTT(float new_delay);
|
||||
|
||||
|
@ -143,7 +143,7 @@ bool NGCFT1::sendPKG_FT1_MESSAGE(
|
||||
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
|
||||
}
|
||||
|
||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set) {
|
||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
|
||||
auto& tf_opt = peer.send_transfers.at(idx);
|
||||
assert(tf_opt.has_value());
|
||||
auto& tf = tf_opt.value();
|
||||
@ -177,14 +177,13 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
return;
|
||||
case State::SENDING: {
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
// no ack after 5 sec -> resend
|
||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
||||
if (timeouts_set.count({idx, id})) {
|
||||
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
|
||||
// TODO: can fail
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca->onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
can_packet_size -= data.size();
|
||||
}
|
||||
});
|
||||
|
||||
@ -211,21 +210,10 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
}
|
||||
|
||||
// if chunks in flight < window size (2)
|
||||
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
|
||||
int64_t can_packet_size {static_cast<int64_t>(peer.cca->canSend())};
|
||||
//if (can_packet_size) {
|
||||
//std::cerr << "FT: can_packet_size: " << can_packet_size;
|
||||
//}
|
||||
size_t count {0};
|
||||
while (can_packet_size > 0 && tf.file_size > 0) {
|
||||
std::vector<uint8_t> new_data;
|
||||
|
||||
// TODO: parameterize packet size? -> only if JF increases lossy packet size >:)
|
||||
//size_t chunk_size = std::min<size_t>(496u, tf.file_size - tf.file_size_current);
|
||||
//size_t chunk_size = std::min<size_t>(can_packet_size, tf.file_size - tf.file_size_current);
|
||||
size_t chunk_size = std::min<size_t>({
|
||||
//496u,
|
||||
//996u,
|
||||
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
||||
static_cast<size_t>(can_packet_size),
|
||||
tf.file_size - tf.file_size_current
|
||||
@ -237,14 +225,6 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
|
||||
new_data.resize(chunk_size);
|
||||
|
||||
//ngc_ft1_ctx->cb_send_data[tf.file_kind](
|
||||
//tox,
|
||||
//group_number, peer_number,
|
||||
//idx,
|
||||
//tf.file_size_current,
|
||||
//new_data.data(), new_data.size(),
|
||||
//ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr
|
||||
//);
|
||||
assert(idx <= 0xffu);
|
||||
// TODO: check return value
|
||||
dispatch(
|
||||
@ -267,22 +247,17 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
|
||||
tf.file_size_current += chunk_size;
|
||||
can_packet_size -= chunk_size;
|
||||
count++;
|
||||
}
|
||||
//if (count) {
|
||||
//std::cerr << " split over " << count << "\n";
|
||||
//}
|
||||
}
|
||||
break;
|
||||
case State::FINISHING: // we still have unacked packets
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
// no ack after 5 sec -> resend
|
||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
||||
if (timeouts_set.count({idx, id})) {
|
||||
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca->onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
can_packet_size -= data.size();
|
||||
}
|
||||
});
|
||||
if (tf.time_since_activity >= sending_give_up_after) {
|
||||
@ -311,9 +286,20 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
|
||||
auto timeouts = peer.cca->getTimeouts();
|
||||
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
||||
|
||||
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
||||
int64_t can_packet_size {peer.cca->canSend()}; // might get more space while iterating (time)
|
||||
|
||||
// change iterat start position to not starve transfers in the back
|
||||
size_t iterated_count = 0;
|
||||
bool last_send_found = false;
|
||||
for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) {
|
||||
idx = idx % peer.send_transfers.size();
|
||||
|
||||
if (peer.send_transfers.at(idx).has_value()) {
|
||||
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set);
|
||||
if (!last_send_found && can_packet_size <= 0) {
|
||||
peer.next_send_transfer_send_idx = idx;
|
||||
last_send_found = true; // only set once
|
||||
}
|
||||
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -599,7 +585,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
||||
|
||||
Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
|
||||
if (!peer.send_transfers[e.transfer_id].has_value()) {
|
||||
std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
|
||||
// we delete directly, packets might still be in flight (in practice they are when ce)
|
||||
//std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -188,6 +188,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
};
|
||||
std::array<std::optional<SendTransfer>, 256> send_transfers;
|
||||
size_t next_send_transfer_idx {0}; // next id will be 0
|
||||
size_t next_send_transfer_send_idx {0};
|
||||
};
|
||||
std::map<uint32_t, Peer> peers;
|
||||
};
|
||||
@ -201,7 +202,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
||||
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
||||
|
||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set);
|
||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
|
||||
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||
|
||||
public:
|
||||
|
@ -1042,6 +1042,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
|
||||
reg.emplace<Message::Components::Timestamp>(new_msg_e, ts); // reactive?
|
||||
|
||||
reg.emplace<Message::Components::TagUnread>(new_msg_e);
|
||||
|
||||
{ // by whom
|
||||
auto& synced_by = reg.get_or_emplace<Message::Components::SyncedBy>(new_msg_e).list;
|
||||
synced_by.emplace(self_c);
|
||||
@ -1325,6 +1327,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
||||
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
||||
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
|
||||
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
|
||||
|
||||
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
|
||||
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
|
||||
|
Reference in New Issue
Block a user