Compare commits
37 Commits
cca_rework
...
broken_som
Author | SHA1 | Date | |
---|---|---|---|
ae3dc74933 | |||
0eb30246a8 | |||
c52ac19285 | |||
1231e792a7 | |||
319e754aff | |||
a4201f4407 | |||
57575330dd | |||
eb2a19d8f3 | |||
dfcb5dee97 | |||
0d40d1abaa | |||
61b667a4aa | |||
c03282eae8 | |||
5fd1f2ab84 | |||
bccd04316a | |||
ccf66fb80c | |||
ea032244e7 | |||
0df0760c06 | |||
f02b03da7c | |||
103f36f2d2 | |||
ad918a3253 | |||
70cea0d219 | |||
b0e2cab17a | |||
0a53a76eb3 | |||
5995059777 | |||
abf2645099 | |||
7c16c54649 | |||
a80e74065c | |||
77f21f01e9 | |||
27fd9e688b | |||
f28e79dcbc | |||
7af5fda0a6 | |||
f91780c602 | |||
1e6929c93b | |||
81a353570b | |||
070585ab3d | |||
ba8befbb2d | |||
a1a9bf886a |
@ -65,5 +65,7 @@ target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
|
||||
sha1::sha1
|
||||
solanaceae_tox_contacts
|
||||
solanaceae_message3
|
||||
solanaceae_object_store
|
||||
solanaceae_file2
|
||||
)
|
||||
|
||||
|
@ -3,8 +3,8 @@
|
||||
#include <iostream>
|
||||
|
||||
NGCEXTEventProvider::NGCEXTEventProvider(ToxEventProviderI& tep) : _tep(tep) {
|
||||
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PACKET);
|
||||
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET);
|
||||
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PACKET);
|
||||
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET);
|
||||
}
|
||||
|
||||
#define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error; }
|
||||
@ -112,6 +112,8 @@ bool NGCEXTEventProvider::parse_ft1_init_ack(
|
||||
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
|
||||
e.transfer_id = data[curser++];
|
||||
|
||||
e.max_lossy_data_size = 500-4; // -4 and 500 are hardcoded
|
||||
|
||||
return dispatch(
|
||||
NGCEXT_Event::FT1_INIT_ACK,
|
||||
e
|
||||
@ -224,6 +226,41 @@ bool NGCEXTEventProvider::parse_ft1_message(
|
||||
);
|
||||
}
|
||||
|
||||
bool NGCEXTEventProvider::parse_ft1_init_ack_v2(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool _private
|
||||
) {
|
||||
if (!_private) {
|
||||
std::cerr << "NGCEXT: ft1_init_ack_v2 cant be public\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
Events::NGCEXT_ft1_init_ack e;
|
||||
e.group_number = group_number;
|
||||
e.peer_number = peer_number;
|
||||
size_t curser = 0;
|
||||
|
||||
// - 1 byte (temporary_file_tf_id)
|
||||
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
|
||||
e.transfer_id = data[curser++];
|
||||
|
||||
// - 2 byte (max_lossy_data_size)
|
||||
if ((data_size - curser) >= sizeof(e.max_lossy_data_size)) {
|
||||
e.max_lossy_data_size = 0;
|
||||
for (size_t i = 0; i < sizeof(e.max_lossy_data_size); i++, curser++) {
|
||||
e.max_lossy_data_size |= uint16_t(data[curser]) << (i*8);
|
||||
}
|
||||
} else {
|
||||
e.max_lossy_data_size = 500-4; // default
|
||||
}
|
||||
|
||||
return dispatch(
|
||||
NGCEXT_Event::FT1_INIT_ACK,
|
||||
e
|
||||
);
|
||||
}
|
||||
|
||||
bool NGCEXTEventProvider::handlePacket(
|
||||
const uint32_t group_number,
|
||||
const uint32_t peer_number,
|
||||
@ -247,7 +284,8 @@ bool NGCEXTEventProvider::handlePacket(
|
||||
case NGCEXT_Event::FT1_INIT:
|
||||
return parse_ft1_init(group_number, peer_number, data+1, data_size-1, _private);
|
||||
case NGCEXT_Event::FT1_INIT_ACK:
|
||||
return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private);
|
||||
//return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private);
|
||||
return parse_ft1_init_ack_v2(group_number, peer_number, data+1, data_size-1, _private);
|
||||
case NGCEXT_Event::FT1_DATA:
|
||||
return parse_ft1_data(group_number, peer_number, data+1, data_size-1, _private);
|
||||
case NGCEXT_Event::FT1_DATA_ACK:
|
||||
|
@ -70,8 +70,6 @@ namespace Events {
|
||||
|
||||
// - X bytes (file_kind dependent id, differnt sizes)
|
||||
std::vector<uint8_t> file_id;
|
||||
|
||||
// TODO: max supported lossy packet size
|
||||
};
|
||||
|
||||
struct NGCEXT_ft1_init_ack {
|
||||
@ -81,7 +79,8 @@ namespace Events {
|
||||
// - 1 byte (transfer_id)
|
||||
uint8_t transfer_id;
|
||||
|
||||
// TODO: max supported lossy packet size
|
||||
// - 2 byte (self_max_lossy_data_size)
|
||||
uint16_t max_lossy_data_size;
|
||||
};
|
||||
|
||||
struct NGCEXT_ft1_data {
|
||||
@ -163,6 +162,7 @@ enum class NGCEXT_Event : uint8_t {
|
||||
// acknowlage init (like an accept)
|
||||
// like tox ft control continue
|
||||
// - 1 byte (transfer_id)
|
||||
// - 2 byte (self_max_lossy_data_size) (optional since v2)
|
||||
FT1_INIT_ACK,
|
||||
|
||||
// TODO: init deny, speed up non acceptance
|
||||
@ -263,6 +263,12 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
|
||||
bool _private
|
||||
);
|
||||
|
||||
bool parse_ft1_init_ack_v2(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool _private
|
||||
);
|
||||
|
||||
bool handlePacket(
|
||||
const uint32_t group_number,
|
||||
const uint32_t peer_number,
|
||||
|
@ -38,22 +38,34 @@ struct CCAI {
|
||||
//static_assert(maximum_segment_size == 574); // mesured in wireshark
|
||||
|
||||
// flow control
|
||||
float max_byterate_allowed {10*1024*1024}; // 10MiB/s
|
||||
//float max_byterate_allowed {100.f*1024*1024}; // 100MiB/s
|
||||
float max_byterate_allowed {10.f*1024*1024}; // 10MiB/s
|
||||
//float max_byterate_allowed {1.f*1024*1024}; // 1MiB/s
|
||||
//float max_byterate_allowed {0.6f*1024*1024}; // 600KiB/s
|
||||
//float max_byterate_allowed {0.5f*1024*1024}; // 500KiB/s
|
||||
//float max_byterate_allowed {0.15f*1024*1024}; // 150KiB/s
|
||||
//float max_byterate_allowed {0.05f*1024*1024}; // 50KiB/s
|
||||
|
||||
public: // api
|
||||
CCAI(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {}
|
||||
virtual ~CCAI(void) {}
|
||||
|
||||
// returns current rtt/delay
|
||||
virtual float getCurrentDelay(void) const = 0;
|
||||
|
||||
// return the current believed window in bytes of how much data can be inflight,
|
||||
//virtual float getCWnD(void) const = 0;
|
||||
virtual float getWindow(void) = 0;
|
||||
|
||||
// TODO: api for how much data we should send
|
||||
// take time since last sent into account
|
||||
// respect max_byterate_allowed
|
||||
virtual size_t canSend(void) = 0;
|
||||
virtual int64_t canSend(float time_delta) = 0;
|
||||
|
||||
// get the list of timed out seq_ids
|
||||
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
|
||||
|
||||
virtual int64_t inFlightCount(void) const { return -1; }
|
||||
|
||||
public: // callbacks
|
||||
// data size is without overhead
|
||||
virtual void onSent(SeqIDType seq, size_t data_size) = 0;
|
||||
|
@ -3,14 +3,25 @@
|
||||
#include <cmath>
|
||||
#include <iostream>
|
||||
|
||||
void CUBIC::updateReductionTimer(float time_delta) {
|
||||
const auto now {getTimeNow()};
|
||||
|
||||
// only keep updating while the cca interaction is not too long ago
|
||||
if (now - _time_point_last_update <= getCurrentDelay()*4.f) {
|
||||
_time_since_reduction += time_delta;
|
||||
}
|
||||
}
|
||||
|
||||
void CUBIC::resetReductionTimer(void) {
|
||||
_time_since_reduction = 0.f;
|
||||
}
|
||||
|
||||
float CUBIC::getCWnD(void) const {
|
||||
const double K = cbrt(
|
||||
(_window_max * (1. - BETA)) / SCALING_CONSTANT
|
||||
);
|
||||
|
||||
const double time_since_reduction = getTimeNow() - _time_point_reduction;
|
||||
|
||||
const double TK = time_since_reduction - K;
|
||||
const double TK = _time_since_reduction - K;
|
||||
|
||||
const double cwnd =
|
||||
SCALING_CONSTANT
|
||||
@ -33,29 +44,70 @@ float CUBIC::getCWnD(void) const {
|
||||
}
|
||||
|
||||
void CUBIC::onCongestion(void) {
|
||||
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()) {
|
||||
const auto current_cwnd = getCWnD();
|
||||
_time_point_reduction = getTimeNow();
|
||||
_window_max = current_cwnd;
|
||||
// 8 is probably too much (800ms for 100ms rtt)
|
||||
if (_time_since_reduction >= getCurrentDelay()*4.f) {
|
||||
const auto tmp_old_tp = _time_since_reduction;
|
||||
|
||||
std::cout << "CONGESTION! cwnd:" << current_cwnd << "\n";
|
||||
const auto current_cwnd = getCWnD(); // TODO: remove, only used by logging?
|
||||
const auto current_wnd = getWindow(); // respects cwnd and fwnd
|
||||
|
||||
_bytes_leftover = 0;
|
||||
resetReductionTimer();
|
||||
|
||||
if (current_cwnd < _window_max) {
|
||||
// congestion before reaching the inflection point (prev window_max).
|
||||
// reduce to wnd*beta to be fair
|
||||
_window_max = current_wnd * BETA;
|
||||
} else {
|
||||
_window_max = current_wnd;
|
||||
}
|
||||
|
||||
_window_max = std::max(_window_max, 2.0*MAXIMUM_SEGMENT_SIZE);
|
||||
|
||||
#if 1
|
||||
std::cout << "----CONGESTION!"
|
||||
<< " cwnd:" << current_cwnd
|
||||
<< " wnd:" << current_wnd
|
||||
<< " cwnd_max:" << _window_max
|
||||
<< " pts:" << tmp_old_tp
|
||||
<< " rtt:" << getCurrentDelay()
|
||||
<< "\n"
|
||||
;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
size_t CUBIC::canSend(void) {
|
||||
const auto fspace_pkgs = FlowOnly::canSend();
|
||||
float CUBIC::getWindow(void) {
|
||||
return std::min<float>(getCWnD(), FlowOnly::getWindow());
|
||||
}
|
||||
|
||||
int64_t CUBIC::canSend(float time_delta) {
|
||||
const auto fspace_pkgs = FlowOnly::canSend(time_delta);
|
||||
|
||||
updateReductionTimer(time_delta);
|
||||
|
||||
if (fspace_pkgs == 0u) {
|
||||
return 0u;
|
||||
}
|
||||
|
||||
const int64_t cspace_bytes = getCWnD() - _in_flight_bytes;
|
||||
const auto window = getCWnD();
|
||||
int64_t cspace_bytes = (window - _in_flight_bytes) + _bytes_leftover;
|
||||
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||
return 0u;
|
||||
}
|
||||
|
||||
// also limit to max sendrate per tick, which is usually smaller than window
|
||||
// this is mostly to prevent spikes on empty windows
|
||||
const auto rate = window / getCurrentDelay();
|
||||
|
||||
// we dont want this limit to fall below atleast 1 segment
|
||||
const int64_t max_bytes_per_tick = std::max<int64_t>(rate * time_delta + 0.5f, MAXIMUM_SEGMENT_SIZE);
|
||||
cspace_bytes = std::min<int64_t>(cspace_bytes, max_bytes_per_tick);
|
||||
|
||||
// limit to whole packets
|
||||
size_t cspace_pkgs = std::floor(cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
|
||||
_bytes_leftover = cspace_bytes - cspace_pkgs;
|
||||
|
||||
return std::min(cspace_pkgs, fspace_pkgs);
|
||||
}
|
||||
|
@ -8,7 +8,8 @@ struct CUBIC : public FlowOnly {
|
||||
//using clock = std::chrono::steady_clock;
|
||||
|
||||
public: // config
|
||||
static constexpr float BETA {0.7f};
|
||||
//static constexpr float BETA {0.7f};
|
||||
static constexpr float BETA {0.8f};
|
||||
static constexpr float SCALING_CONSTANT {0.4f};
|
||||
static constexpr float RTT_EMA_ALPHA = 0.1f; // 0.1 is very smooth, might need more
|
||||
|
||||
@ -16,9 +17,14 @@ struct CUBIC : public FlowOnly {
|
||||
// window size before last reduciton
|
||||
double _window_max {2.f * MAXIMUM_SEGMENT_SIZE}; // start with mss*2
|
||||
//double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE};
|
||||
double _time_point_reduction {getTimeNow()};
|
||||
|
||||
double _time_since_reduction {12.f}; // warm start
|
||||
int64_t _bytes_leftover {0};
|
||||
|
||||
private:
|
||||
void updateReductionTimer(float time_delta);
|
||||
void resetReductionTimer(void);
|
||||
|
||||
float getCWnD(void) const;
|
||||
|
||||
// moving avg over the last few delay samples
|
||||
@ -31,11 +37,14 @@ struct CUBIC : public FlowOnly {
|
||||
|
||||
public: // api
|
||||
CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {}
|
||||
virtual ~CUBIC(void) {}
|
||||
|
||||
float getWindow(void) override;
|
||||
|
||||
// TODO: api for how much data we should send
|
||||
// take time since last sent into account
|
||||
// respect max_byterate_allowed
|
||||
size_t canSend(void) override;
|
||||
int64_t canSend(float time_delta) override;
|
||||
|
||||
// get the list of timed out seq_ids
|
||||
//std::vector<SeqIDType> getTimeouts(void) const override;
|
||||
|
@ -6,10 +6,16 @@
|
||||
#include <algorithm>
|
||||
|
||||
float FlowOnly::getCurrentDelay(void) const {
|
||||
return std::min(_rtt_ema, RTT_MAX);
|
||||
// below 1ms is useless
|
||||
return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
|
||||
}
|
||||
|
||||
void FlowOnly::addRTT(float new_delay) {
|
||||
if (new_delay > _rtt_ema * RTT_UP_MAX) {
|
||||
// too large a jump up, to be taken into account
|
||||
return;
|
||||
}
|
||||
|
||||
// lerp(new_delay, rtt_ema, 0.1)
|
||||
_rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema;
|
||||
}
|
||||
@ -23,7 +29,40 @@ void FlowOnly::updateWindow(void) {
|
||||
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
|
||||
}
|
||||
|
||||
size_t FlowOnly::canSend(void) {
|
||||
void FlowOnly::updateCongestion(void) {
|
||||
const auto tmp_window = getWindow();
|
||||
// packet window * 0.3
|
||||
// but atleast 4
|
||||
int32_t max_consecutive_events = std::clamp<int32_t>(
|
||||
(tmp_window/MAXIMUM_SEGMENT_DATA_SIZE) * 0.3f,
|
||||
4,
|
||||
50 // limit TODO: fix idle/time starved algo
|
||||
);
|
||||
// TODO: magic number
|
||||
|
||||
#if 0
|
||||
std::cout << "NGC_FT1 Flow: pkg out of order"
|
||||
<< " w:" << tmp_window
|
||||
<< " pw:" << tmp_window/MAXIMUM_SEGMENT_DATA_SIZE
|
||||
<< " coe:" << _consecutive_events
|
||||
<< " mcoe:" << max_consecutive_events
|
||||
<< "\n";
|
||||
#endif
|
||||
|
||||
if (_consecutive_events > max_consecutive_events) {
|
||||
//std::cout << "CONGESTION! NGC_FT1 flow: pkg out of order\n";
|
||||
onCongestion();
|
||||
|
||||
// TODO: set _consecutive_events to zero?
|
||||
}
|
||||
}
|
||||
|
||||
float FlowOnly::getWindow(void) {
|
||||
updateWindow();
|
||||
return _fwnd;
|
||||
}
|
||||
|
||||
int64_t FlowOnly::canSend(float time_delta) {
|
||||
if (_in_flight.empty()) {
|
||||
assert(_in_flight_bytes == 0);
|
||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
@ -31,16 +70,17 @@ size_t FlowOnly::canSend(void) {
|
||||
|
||||
updateWindow();
|
||||
|
||||
const int64_t fspace = _fwnd - _in_flight_bytes;
|
||||
int64_t fspace = _fwnd - _in_flight_bytes;
|
||||
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||
return 0u;
|
||||
}
|
||||
|
||||
// limit to whole packets
|
||||
size_t space = std::floor(fspace / MAXIMUM_SEGMENT_DATA_SIZE)
|
||||
* MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
// also limit to max sendrate per tick, which is usually smaller than window
|
||||
// this is mostly to prevent spikes on empty windows
|
||||
fspace = std::min<int64_t>(fspace, max_byterate_allowed * time_delta + 0.5f);
|
||||
|
||||
return space;
|
||||
// limit to whole packets
|
||||
return (fspace / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
}
|
||||
|
||||
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||
@ -49,7 +89,7 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||
// after 3 rtt delay, we trigger timeout
|
||||
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
|
||||
|
||||
for (const auto& [seq, time_stamp, size] : _in_flight) {
|
||||
for (const auto& [seq, time_stamp, size, _] : _in_flight) {
|
||||
if (now_adjusted > time_stamp) {
|
||||
list.push_back(seq);
|
||||
}
|
||||
@ -58,16 +98,28 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||
return list;
|
||||
}
|
||||
|
||||
int64_t FlowOnly::inFlightCount(void) const {
|
||||
return _in_flight.size();
|
||||
}
|
||||
|
||||
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
|
||||
if constexpr (true) {
|
||||
for (const auto& it : _in_flight) {
|
||||
assert(std::get<0>(it) != seq);
|
||||
assert(it.id != seq);
|
||||
}
|
||||
}
|
||||
|
||||
_in_flight.push_back({seq, getTimeNow(), data_size + SEGMENT_OVERHEAD});
|
||||
_in_flight_bytes += data_size + SEGMENT_OVERHEAD;
|
||||
//_recently_sent_bytes += data_size + SEGMENT_OVERHEAD;
|
||||
const auto& new_entry = _in_flight.emplace_back(
|
||||
FlyingBunch{
|
||||
seq,
|
||||
static_cast<float>(getTimeNow()),
|
||||
data_size + SEGMENT_OVERHEAD,
|
||||
false
|
||||
}
|
||||
);
|
||||
_in_flight_bytes += new_entry.bytes;
|
||||
|
||||
_time_point_last_update = getTimeNow();
|
||||
}
|
||||
|
||||
void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
@ -78,28 +130,31 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
|
||||
const auto now {getTimeNow()};
|
||||
|
||||
_time_point_last_update = now;
|
||||
|
||||
// first seq in seqs is the actual value, all extra are for redundency
|
||||
{ // skip in ack is congestion event
|
||||
// 1. look at primary ack of packet
|
||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool {
|
||||
return std::get<0>(v) == seq;
|
||||
return v.id == seq;
|
||||
});
|
||||
if (it != _in_flight.end()) {
|
||||
if (it != _in_flight.begin()) {
|
||||
if (it != _in_flight.end() && !it->ignore) {
|
||||
// find first non ignore, it should be the expected
|
||||
auto first_it = std::find_if_not(_in_flight.cbegin(), _in_flight.cend(), [](const auto& v) -> bool { return v.ignore; });
|
||||
|
||||
if (first_it != _in_flight.cend() && it != first_it) {
|
||||
// not next expected seq -> skip detected
|
||||
|
||||
std::cout << "CONGESTION out of order\n";
|
||||
onCongestion();
|
||||
//if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
||||
//_recently_lost_data = true;
|
||||
//_last_congestion_event = getTimeNow();
|
||||
//_last_congestion_rtt = getCurrentDelay();
|
||||
//}
|
||||
_consecutive_events++;
|
||||
it->ignore = true; // only handle once
|
||||
|
||||
updateCongestion();
|
||||
} else {
|
||||
// only mesure delay, if not a congestion
|
||||
addRTT(now - std::get<1>(*it));
|
||||
addRTT(now - it->timestamp);
|
||||
_consecutive_events = 0;
|
||||
}
|
||||
} else {
|
||||
} else { // TOOD: if ! ignore too
|
||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
#if 0
|
||||
// assume we got a duplicated packet
|
||||
@ -111,14 +166,14 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
|
||||
for (const auto& seq : seqs) {
|
||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||
return std::get<0>(v) == seq;
|
||||
return v.id == seq;
|
||||
});
|
||||
|
||||
if (it == _in_flight.end()) {
|
||||
continue; // not found, ignore
|
||||
} else {
|
||||
//most_recent = std::max(most_recent, std::get<1>(*it));
|
||||
_in_flight_bytes -= std::get<2>(*it);
|
||||
_in_flight_bytes -= it->bytes;
|
||||
assert(_in_flight_bytes >= 0);
|
||||
//_recently_acked_data += std::get<2>(*it);
|
||||
_in_flight.erase(it);
|
||||
@ -128,8 +183,8 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||
|
||||
void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||
assert(!std::isnan(std::get<1>(v)));
|
||||
return std::get<0>(v) == seq;
|
||||
assert(!std::isnan(v.timestamp));
|
||||
return v.id == seq;
|
||||
});
|
||||
|
||||
if (it == _in_flight.end()) {
|
||||
@ -137,24 +192,27 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
||||
return; // not found, ignore ??
|
||||
}
|
||||
|
||||
std::cerr << "FLOW loss\n";
|
||||
//std::cerr << "FLOW loss\n";
|
||||
|
||||
// "if data lost is not to be retransmitted"
|
||||
if (discard) {
|
||||
_in_flight_bytes -= std::get<2>(*it);
|
||||
_in_flight_bytes -= it->bytes;
|
||||
assert(_in_flight_bytes >= 0);
|
||||
_in_flight.erase(it);
|
||||
}
|
||||
// TODO: reset timestamp?
|
||||
|
||||
#if 0 // temporarily disable ce for timeout
|
||||
// at most once per rtt?
|
||||
// TODO: use delay at event instead
|
||||
if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
||||
_recently_lost_data = true;
|
||||
_last_congestion_event = getTimeNow();
|
||||
_last_congestion_rtt = getCurrentDelay();
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
// and not take into rtt
|
||||
it->timestamp = getTimeNow();
|
||||
it->ignore = true;
|
||||
}
|
||||
|
||||
// usually after data arrived out-of-order/duplicate
|
||||
if (!it->ignore) {
|
||||
it->ignore = true; // only handle once
|
||||
//_consecutive_events++;
|
||||
|
||||
//updateCongestion();
|
||||
// this is usually a safe indicator for congestion/maxed connection
|
||||
onCongestion();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,17 +11,10 @@ struct FlowOnly : public CCAI {
|
||||
using clock = std::chrono::steady_clock;
|
||||
|
||||
public: // config
|
||||
static constexpr float RTT_EMA_ALPHA = 0.1f; // might need over time
|
||||
static constexpr float RTT_EMA_ALPHA = 0.001f; // might need change over time
|
||||
static constexpr float RTT_UP_MAX = 3.0f; // how much larger a delay can be to be taken into account
|
||||
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
|
||||
|
||||
//float max_byterate_allowed {100.f*1024*1024}; // 100MiB/s
|
||||
float max_byterate_allowed {10.f*1024*1024}; // 10MiB/s
|
||||
//float max_byterate_allowed {1.f*1024*1024}; // 1MiB/s
|
||||
//float max_byterate_allowed {0.6f*1024*1024}; // 600KiB/s
|
||||
//float max_byterate_allowed {0.5f*1024*1024}; // 500KiB/s
|
||||
//float max_byterate_allowed {0.05f*1024*1024}; // 50KiB/s
|
||||
//float max_byterate_allowed {0.15f*1024*1024}; // 150KiB/s
|
||||
|
||||
protected:
|
||||
// initialize to low value, will get corrected very fast
|
||||
float _fwnd {0.01f * max_byterate_allowed}; // in bytes
|
||||
@ -30,11 +23,24 @@ struct FlowOnly : public CCAI {
|
||||
float _rtt_ema {0.1f};
|
||||
|
||||
// list of sequence ids and timestamps of when they where sent (and payload size)
|
||||
std::vector<std::tuple<SeqIDType, float, size_t>> _in_flight;
|
||||
struct FlyingBunch {
|
||||
SeqIDType id;
|
||||
float timestamp;
|
||||
size_t bytes;
|
||||
|
||||
// set to true if counted as ce or resent due to timeout
|
||||
bool ignore {false};
|
||||
};
|
||||
std::vector<FlyingBunch> _in_flight;
|
||||
int64_t _in_flight_bytes {0};
|
||||
|
||||
int32_t _consecutive_events {0};
|
||||
|
||||
clock::time_point _time_start_offset;
|
||||
|
||||
// used to clamp growth rate in the void
|
||||
double _time_point_last_update {getTimeNow()};
|
||||
|
||||
protected:
|
||||
// make values relative to algo start for readability (and precision)
|
||||
// get timestamp in seconds
|
||||
@ -44,7 +50,9 @@ struct FlowOnly : public CCAI {
|
||||
|
||||
// moving avg over the last few delay samples
|
||||
// VERY sensitive to bundling acks
|
||||
float getCurrentDelay(void) const;
|
||||
float getCurrentDelay(void) const override;
|
||||
|
||||
float getWindow(void) override;
|
||||
|
||||
void addRTT(float new_delay);
|
||||
|
||||
@ -52,17 +60,23 @@ struct FlowOnly : public CCAI {
|
||||
|
||||
virtual void onCongestion(void) {};
|
||||
|
||||
// internal logic, calls the onCongestion() event
|
||||
void updateCongestion(void);
|
||||
|
||||
public: // api
|
||||
FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {}
|
||||
virtual ~FlowOnly(void) {}
|
||||
|
||||
// TODO: api for how much data we should send
|
||||
// take time since last sent into account
|
||||
// respect max_byterate_allowed
|
||||
size_t canSend(void) override;
|
||||
int64_t canSend(float time_delta) override;
|
||||
|
||||
// get the list of timed out seq_ids
|
||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||
|
||||
int64_t inFlightCount(void) const override;
|
||||
|
||||
public: // callbacks
|
||||
// data size is without overhead
|
||||
void onSent(SeqIDType seq, size_t data_size) override;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <deque>
|
||||
#include <cstdint>
|
||||
#include <cassert>
|
||||
#include <tuple>
|
||||
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
@ -19,7 +20,7 @@ LEDBAT::LEDBAT(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_siz
|
||||
_time_start_offset = clock::now();
|
||||
}
|
||||
|
||||
size_t LEDBAT::canSend(void) {
|
||||
int64_t LEDBAT::canSend(float time_delta) {
|
||||
if (_in_flight.empty()) {
|
||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
}
|
||||
@ -34,9 +35,7 @@ size_t LEDBAT::canSend(void) {
|
||||
return 0u;
|
||||
}
|
||||
|
||||
size_t space = std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
|
||||
return space;
|
||||
return std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||
}
|
||||
|
||||
std::vector<LEDBAT::SeqIDType> LEDBAT::getTimeouts(void) const {
|
||||
|
@ -47,21 +47,20 @@ struct LEDBAT : public CCAI{
|
||||
|
||||
//static constexpr size_t rtt_buffer_size_max {2000};
|
||||
|
||||
float max_byterate_allowed {10*1024*1024}; // 10MiB/s
|
||||
|
||||
public:
|
||||
LEDBAT(size_t maximum_segment_data_size);
|
||||
virtual ~LEDBAT(void) {}
|
||||
|
||||
// return the current believed window in bytes of how much data can be inflight,
|
||||
// without overstepping the delay requirement
|
||||
float getCWnD(void) const {
|
||||
float getWindow(void) override {
|
||||
return _cwnd;
|
||||
}
|
||||
|
||||
// TODO: api for how much data we should send
|
||||
// take time since last sent into account
|
||||
// respect max_byterate_allowed
|
||||
size_t canSend(void) override;
|
||||
int64_t canSend(float time_delta) override;
|
||||
|
||||
// get the list of timed out seq_ids
|
||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||
@ -86,7 +85,7 @@ struct LEDBAT : public CCAI{
|
||||
|
||||
// moving avg over the last few delay samples
|
||||
// VERY sensitive to bundling acks
|
||||
float getCurrentDelay(void) const;
|
||||
float getCurrentDelay(void) const override;
|
||||
|
||||
void addRTT(float new_delay);
|
||||
|
||||
|
@ -1,9 +1,14 @@
|
||||
#include "./ngcft1.hpp"
|
||||
|
||||
#include <solanaceae/toxcore/utils.hpp>
|
||||
#include "./flow_only.hpp"
|
||||
#include "./cubic.hpp"
|
||||
#include "./ledbat.hpp"
|
||||
|
||||
#include <solanaceae/util/utils.hpp>
|
||||
|
||||
#include <sodium.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <set>
|
||||
#include <algorithm>
|
||||
@ -71,6 +76,12 @@ bool NGCFT1::sendPKG_FT1_INIT_ACK(
|
||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT_ACK));
|
||||
pkg.push_back(transfer_id);
|
||||
|
||||
// - 2 bytes max_lossy_data_size
|
||||
const uint16_t max_lossy_data_size = _t.toxGroupMaxCustomLossyPacketLength() - 4;
|
||||
for (size_t i = 0; i < sizeof(uint16_t); i++) {
|
||||
pkg.push_back((max_lossy_data_size>>(i*8)) & 0xff);
|
||||
}
|
||||
|
||||
// lossless
|
||||
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
|
||||
}
|
||||
@ -87,6 +98,7 @@ bool NGCFT1::sendPKG_FT1_DATA(
|
||||
// check header_size+data_size <= max pkg size
|
||||
|
||||
std::vector<uint8_t> pkg;
|
||||
pkg.reserve(2048); // saves a ton of allocations
|
||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA));
|
||||
pkg.push_back(transfer_id);
|
||||
pkg.push_back(sequence_id & 0xff);
|
||||
@ -107,6 +119,7 @@ bool NGCFT1::sendPKG_FT1_DATA_ACK(
|
||||
const uint16_t* seq_ids, size_t seq_ids_size
|
||||
) {
|
||||
std::vector<uint8_t> pkg;
|
||||
pkg.reserve(1+1+2*32); // 32acks in a single pkg should be unlikely
|
||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA_ACK));
|
||||
pkg.push_back(transfer_id);
|
||||
|
||||
@ -143,7 +156,7 @@ bool NGCFT1::sendPKG_FT1_MESSAGE(
|
||||
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
|
||||
}
|
||||
|
||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set) {
|
||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
|
||||
auto& tf_opt = peer.send_transfers.at(idx);
|
||||
assert(tf_opt.has_value());
|
||||
auto& tf = tf_opt.value();
|
||||
@ -175,22 +188,36 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
}
|
||||
//break;
|
||||
return;
|
||||
case State::SENDING: {
|
||||
case State::FINISHING: // we still have unacked packets
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
// no ack after 5 sec -> resend
|
||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
||||
if (timeouts_set.count({idx, id})) {
|
||||
// TODO: can fail
|
||||
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca->onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
can_packet_size -= data.size();
|
||||
}
|
||||
});
|
||||
|
||||
if (tf.time_since_activity >= sending_give_up_after) {
|
||||
// no ack after 30sec, close ft
|
||||
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n";
|
||||
// TODO: notify app
|
||||
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
|
||||
|
||||
// clean up cca
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
peer.cca->onLoss({idx, id}, true);
|
||||
timeouts_set.erase({idx, id});
|
||||
});
|
||||
|
||||
tf_opt.reset();
|
||||
}
|
||||
break;
|
||||
case State::SENDING: {
|
||||
// first handle overall timeout (could otherwise do resends directly before, which is useless)
|
||||
// timeout increases with active transfers (otherwise we could starve them)
|
||||
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
|
||||
// no ack after 30sec, close ft
|
||||
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n";
|
||||
dispatch(
|
||||
NGCFT1_Event::send_done,
|
||||
Events::NGCFT1_send_done{
|
||||
@ -210,22 +237,23 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
return;
|
||||
}
|
||||
|
||||
// do resends
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
|
||||
// TODO: can fail
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca->onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
can_packet_size -= data.size();
|
||||
}
|
||||
});
|
||||
|
||||
// if chunks in flight < window size (2)
|
||||
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
|
||||
int64_t can_packet_size {static_cast<int64_t>(peer.cca->canSend())};
|
||||
//if (can_packet_size) {
|
||||
//std::cerr << "FT: can_packet_size: " << can_packet_size;
|
||||
//}
|
||||
size_t count {0};
|
||||
while (can_packet_size > 0 && tf.file_size > 0) {
|
||||
std::vector<uint8_t> new_data;
|
||||
|
||||
// TODO: parameterize packet size? -> only if JF increases lossy packet size >:)
|
||||
//size_t chunk_size = std::min<size_t>(496u, tf.file_size - tf.file_size_current);
|
||||
//size_t chunk_size = std::min<size_t>(can_packet_size, tf.file_size - tf.file_size_current);
|
||||
size_t chunk_size = std::min<size_t>({
|
||||
//496u,
|
||||
//996u,
|
||||
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
||||
static_cast<size_t>(can_packet_size),
|
||||
tf.file_size - tf.file_size_current
|
||||
@ -237,14 +265,6 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
|
||||
new_data.resize(chunk_size);
|
||||
|
||||
//ngc_ft1_ctx->cb_send_data[tf.file_kind](
|
||||
//tox,
|
||||
//group_number, peer_number,
|
||||
//idx,
|
||||
//tf.file_size_current,
|
||||
//new_data.data(), new_data.size(),
|
||||
//ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr
|
||||
//);
|
||||
assert(idx <= 0xffu);
|
||||
// TODO: check return value
|
||||
dispatch(
|
||||
@ -258,45 +278,19 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
);
|
||||
|
||||
uint16_t seq_id = tf.ssb.add(std::move(new_data));
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
|
||||
const bool sent = sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
|
||||
if (sent) {
|
||||
peer.cca->onSent({idx, seq_id}, chunk_size);
|
||||
|
||||
#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1
|
||||
fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id);
|
||||
#endif
|
||||
} else {
|
||||
std::cerr << "NGCFT1: failed to send packet (queue full?) --------------\n";
|
||||
peer.cca->onLoss({idx, seq_id}, false); // HACK: fake congestion event
|
||||
// TODO: onCongestion
|
||||
can_packet_size = 0;
|
||||
}
|
||||
|
||||
tf.file_size_current += chunk_size;
|
||||
can_packet_size -= chunk_size;
|
||||
count++;
|
||||
}
|
||||
//if (count) {
|
||||
//std::cerr << " split over " << count << "\n";
|
||||
//}
|
||||
}
|
||||
break;
|
||||
case State::FINISHING: // we still have unacked packets
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
// no ack after 5 sec -> resend
|
||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
||||
if (timeouts_set.count({idx, id})) {
|
||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||
peer.cca->onLoss({idx, id}, false);
|
||||
time_since_activity = 0.f;
|
||||
timeouts_set.erase({idx, id});
|
||||
}
|
||||
});
|
||||
if (tf.time_since_activity >= sending_give_up_after) {
|
||||
// no ack after 30sec, close ft
|
||||
// TODO: notify app
|
||||
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
|
||||
|
||||
// clean up cca
|
||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||
peer.cca->onLoss({idx, id}, true);
|
||||
timeouts_set.erase({idx, id});
|
||||
});
|
||||
|
||||
tf_opt.reset();
|
||||
}
|
||||
break;
|
||||
default: // invalid state, delete
|
||||
@ -308,12 +302,33 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
}
|
||||
|
||||
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
||||
if (peer.cca) {
|
||||
auto timeouts = peer.cca->getTimeouts();
|
||||
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
||||
|
||||
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
||||
int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time)
|
||||
|
||||
// get number current running transfers TODO: improve
|
||||
peer.active_send_transfers = 0;
|
||||
for (const auto& it : peer.send_transfers) {
|
||||
if (it.has_value()) {
|
||||
peer.active_send_transfers++;
|
||||
}
|
||||
}
|
||||
|
||||
// change iterate start position to not starve transfers in the back
|
||||
size_t iterated_count = 0;
|
||||
bool last_send_found = false;
|
||||
for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) {
|
||||
idx = idx % peer.send_transfers.size();
|
||||
|
||||
if (peer.send_transfers.at(idx).has_value()) {
|
||||
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set);
|
||||
if (!last_send_found && can_packet_size <= 0) {
|
||||
peer.next_send_transfer_send_idx = idx;
|
||||
last_send_found = true; // only set once
|
||||
}
|
||||
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -333,16 +348,43 @@ NGCFT1::NGCFT1(
|
||||
_neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK);
|
||||
_neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE);
|
||||
|
||||
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_PEER_EXIT);
|
||||
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT);
|
||||
}
|
||||
|
||||
void NGCFT1::iterate(float time_delta) {
|
||||
float NGCFT1::iterate(float time_delta) {
|
||||
bool transfer_in_progress {false};
|
||||
for (auto& [group_number, group] : groups) {
|
||||
for (auto& [peer_number, peer] : group.peers) {
|
||||
iteratePeer(time_delta, group_number, peer_number, peer);
|
||||
|
||||
// find any active transfer
|
||||
if (!transfer_in_progress) {
|
||||
for (const auto& t : peer.send_transfers) {
|
||||
if (t.has_value()) {
|
||||
transfer_in_progress = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!transfer_in_progress) {
|
||||
for (const auto& t : peer.recv_transfers) {
|
||||
if (t.has_value()) {
|
||||
transfer_in_progress = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (transfer_in_progress) {
|
||||
// ~15ms for up to 1mb/s
|
||||
// ~5ms for up to 4mb/s
|
||||
return 0.005f; // 5ms
|
||||
} else {
|
||||
return 1.f; // once a sec might be too little
|
||||
}
|
||||
}
|
||||
|
||||
void NGCFT1::NGC_FT1_send_request_private(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
@ -487,7 +529,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
||||
//#if !NDEBUG
|
||||
std::cout << "NGCFT1: FT1_INIT_ACK\n";
|
||||
std::cout << "NGCFT1: FT1_INIT_ACK mds:" << e.max_lossy_data_size << "\n";
|
||||
//#endif
|
||||
|
||||
// we now should start sending data
|
||||
@ -507,10 +549,28 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
||||
|
||||
using State = Group::Peer::SendTransfer::State;
|
||||
if (transfer.state != State::INIT_SENT) {
|
||||
std::cerr << "NGCFT1 error: inti_ack but not in INIT_SENT state\n";
|
||||
std::cerr << "NGCFT1 error: init_ack but not in INIT_SENT state\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
// negotiated packet_data_size
|
||||
const auto negotiated_packet_data_size = std::min<uint32_t>(e.max_lossy_data_size, _t.toxGroupMaxCustomLossyPacketLength()-4);
|
||||
// TODO: reset cca with new pkg size
|
||||
if (!peer.cca) {
|
||||
// make random max of [1020-1220]
|
||||
const uint32_t random_max_data_size = (1024-4) + _rng()%201;
|
||||
const uint32_t randomized_negotiated_packet_data_size = std::min(negotiated_packet_data_size, random_max_data_size);
|
||||
|
||||
peer.max_packet_data_size = randomized_negotiated_packet_data_size;
|
||||
|
||||
std::cerr << "NGCFT1: creating cca with max:" << peer.max_packet_data_size << "\n";
|
||||
|
||||
peer.cca = std::make_unique<CUBIC>(peer.max_packet_data_size);
|
||||
//peer.cca = std::make_unique<LEDBAT>(peer.max_packet_data_size);
|
||||
//peer.cca = std::make_unique<FlowOnly>(peer.max_packet_data_size);
|
||||
//peer.cca->max_byterate_allowed = 1.f *1024*1024;
|
||||
}
|
||||
|
||||
// iterate will now call NGC_FT1_send_data_cb
|
||||
transfer.state = State::SENDING;
|
||||
transfer.time_since_activity = 0.f;
|
||||
@ -520,7 +580,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
||||
#if !NDEBUG
|
||||
std::cout << "NGCFT1: FT1_DATA\n";
|
||||
//std::cout << "NGCFT1: FT1_DATA\n";
|
||||
#endif
|
||||
|
||||
if (e.data.empty()) {
|
||||
@ -599,7 +659,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
||||
|
||||
Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
|
||||
if (!peer.send_transfers[e.transfer_id].has_value()) {
|
||||
std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
|
||||
// we delete directly, packets might still be in flight (in practice they are when ce)
|
||||
//std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -625,7 +686,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
||||
|
||||
// delete if all packets acked
|
||||
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
|
||||
std::cout << "NGCFT1: " << int(e.transfer_id) << " done\n";
|
||||
std::cout << "NGCFT1: " << int(e.transfer_id) << " done. wnd:" << peer.cca->getWindow() << "\n";
|
||||
dispatch(
|
||||
NGCFT1_Event::send_done,
|
||||
Events::NGCFT1_send_done{
|
||||
@ -711,7 +772,7 @@ bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
||||
}
|
||||
|
||||
// reset cca
|
||||
peer.cca = std::make_unique<CUBIC>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||
peer.cca.reset(); // dont actually reallocate
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -2,22 +2,22 @@
|
||||
|
||||
// solanaceae port of tox_ngc_ft1
|
||||
|
||||
#include <solanaceae/toxcore/tox_interface.hpp>
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
#include <solanaceae/toxcore/tox_interface.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ext/ngcext.hpp>
|
||||
#include "./cubic.hpp"
|
||||
//#include "./flow_only.hpp"
|
||||
//#include "./ledbat.hpp"
|
||||
#include "./cca.hpp"
|
||||
|
||||
#include "./rcv_buf.hpp"
|
||||
#include "./snd_buf.hpp"
|
||||
|
||||
#include "./ngcft1_file_kind.hpp"
|
||||
|
||||
#include <cstdint>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <memory>
|
||||
#include <random>
|
||||
|
||||
namespace Events {
|
||||
|
||||
@ -133,15 +133,19 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
ToxEventProviderI& _tep;
|
||||
NGCEXTEventProviderI& _neep;
|
||||
|
||||
std::default_random_engine _rng{std::random_device{}()};
|
||||
|
||||
// TODO: config
|
||||
size_t acks_per_packet {3u}; // 3
|
||||
float init_retry_timeout_after {5.f}; // 10sec
|
||||
float sending_give_up_after {30.f}; // 30sec
|
||||
float init_retry_timeout_after {4.f};
|
||||
float sending_give_up_after {15.f}; // 30sec (per active transfer)
|
||||
|
||||
|
||||
struct Group {
|
||||
struct Peer {
|
||||
std::unique_ptr<CCAI> cca = std::make_unique<CUBIC>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||
uint32_t max_packet_data_size {500-4};
|
||||
//std::unique_ptr<CCAI> cca = std::make_unique<CUBIC>(max_packet_data_size); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||
std::unique_ptr<CCAI> cca;
|
||||
|
||||
struct RecvTransfer {
|
||||
uint32_t file_kind;
|
||||
@ -188,6 +192,9 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
};
|
||||
std::array<std::optional<SendTransfer>, 256> send_transfers;
|
||||
size_t next_send_transfer_idx {0}; // next id will be 0
|
||||
size_t next_send_transfer_send_idx {0};
|
||||
|
||||
size_t active_send_transfers {0};
|
||||
};
|
||||
std::map<uint32_t, Peer> peers;
|
||||
};
|
||||
@ -201,7 +208,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
||||
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
||||
|
||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set);
|
||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
|
||||
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||
|
||||
public:
|
||||
@ -211,7 +218,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
NGCEXTEventProviderI& neep
|
||||
);
|
||||
|
||||
void iterate(float delta);
|
||||
float iterate(float delta);
|
||||
|
||||
public: // ft1 api
|
||||
// TODO: public variant?
|
||||
|
@ -1,58 +1,76 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/message3/file.hpp>
|
||||
#include <solanaceae/file/file2.hpp>
|
||||
|
||||
#include "./mio.hpp"
|
||||
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <cstring>
|
||||
|
||||
struct FileRWMapped : public FileI {
|
||||
struct File2RWMapped : public File2I {
|
||||
mio::ummap_sink _file_map;
|
||||
|
||||
// TODO: add truncate support?
|
||||
FileRWMapped(std::string_view file_path, uint64_t file_size) {
|
||||
_file_size = file_size;
|
||||
// TODO: rw always true?
|
||||
File2RWMapped(std::string_view file_path, int64_t file_size = -1) : File2I(true, true) {
|
||||
std::filesystem::path native_file_path{file_path};
|
||||
|
||||
if (!std::filesystem::exists(file_path)) {
|
||||
std::ofstream(std::string{file_path}) << '\0'; // force create the file
|
||||
if (!std::filesystem::exists(native_file_path)) {
|
||||
std::ofstream(native_file_path) << '\0'; // force create the file
|
||||
}
|
||||
|
||||
_file_size = std::filesystem::file_size(native_file_path);
|
||||
if (file_size >= 0 && _file_size != file_size) {
|
||||
_file_size = file_size;
|
||||
std::filesystem::resize_file(native_file_path, file_size); // ensure size, usually sparse
|
||||
}
|
||||
std::filesystem::resize_file(file_path, file_size); // ensure size, usually sparse
|
||||
|
||||
std::error_code err;
|
||||
// sink, is also read
|
||||
_file_map.map(std::string{file_path}, 0, file_size, err);
|
||||
_file_map.map(native_file_path.u8string(), 0, _file_size, err);
|
||||
|
||||
if (err) {
|
||||
// TODO: errro
|
||||
std::cerr << "FileRWMapped error: mapping file failed " << err << "\n";
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
virtual ~FileRWMapped(void) override {}
|
||||
virtual ~File2RWMapped(void) override {}
|
||||
|
||||
bool isGood(void) override {
|
||||
return _file_map.is_mapped();
|
||||
}
|
||||
|
||||
std::vector<uint8_t> read(uint64_t pos, uint64_t size) override {
|
||||
if (pos+size > _file_size) {
|
||||
//assert(false && "read past end");
|
||||
return {};
|
||||
}
|
||||
|
||||
return {_file_map.data()+pos, _file_map.data()+(pos+size)};
|
||||
}
|
||||
|
||||
bool write(uint64_t pos, const std::vector<uint8_t>& data) override {
|
||||
if (pos+data.size() > _file_size) {
|
||||
bool write(const ByteSpan data, int64_t pos = -1) override {
|
||||
// TODO: support streaming write
|
||||
if (pos < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::memcpy(_file_map.data()+pos, data.data(), data.size());
|
||||
if (data.empty()) {
|
||||
return true; // false?
|
||||
}
|
||||
|
||||
// file size is fix for mmaped files
|
||||
if (pos+data.size > _file_size) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::memcpy(_file_map.data()+pos, data.ptr, data.size);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
ByteSpanWithOwnership read(uint64_t size, int64_t pos = -1) override {
|
||||
if (pos+size > _file_size) {
|
||||
//assert(false && "read past end");
|
||||
return ByteSpan{};
|
||||
}
|
||||
|
||||
// return non-owning
|
||||
return ByteSpan{_file_map.data()+pos, size};
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1,14 +1,12 @@
|
||||
#include "./sha1_ngcft1.hpp"
|
||||
|
||||
#include <solanaceae/toxcore/utils.hpp>
|
||||
#include <solanaceae/util/utils.hpp>
|
||||
|
||||
#include <solanaceae/contact/components.hpp>
|
||||
#include <solanaceae/tox_contacts/components.hpp>
|
||||
#include <solanaceae/message3/components.hpp>
|
||||
#include <solanaceae/tox_messages/components.hpp>
|
||||
|
||||
#include <solanaceae/message3/file_r_file.hpp>
|
||||
|
||||
#include "./ft1_sha1_info.hpp"
|
||||
#include "./hash_utils.hpp"
|
||||
|
||||
@ -26,11 +24,11 @@
|
||||
|
||||
namespace Message::Components {
|
||||
|
||||
using Content = ContentHandle;
|
||||
using Content = ObjectHandle;
|
||||
|
||||
} // Message::Components
|
||||
|
||||
// TODO: rename to content components
|
||||
// TODO: rename to object components
|
||||
namespace Components {
|
||||
|
||||
struct Messages {
|
||||
@ -112,8 +110,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
|
||||
}
|
||||
}
|
||||
|
||||
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash) {
|
||||
// TODO: transfers
|
||||
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) {
|
||||
for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) {
|
||||
// if already in queue
|
||||
if (i_g == group_number && i_p == peer_number && i_h == hash) {
|
||||
@ -123,6 +120,32 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe
|
||||
}
|
||||
}
|
||||
|
||||
// check for running transfer
|
||||
if (_sending_transfers.count(combineIds(group_number, peer_number))) {
|
||||
for (const auto& [_, transfer] : _sending_transfers.at(combineIds(group_number, peer_number))) {
|
||||
if (std::holds_alternative<SendingTransfer::Info>(transfer.v)) {
|
||||
// ignore info
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto& t_c = std::get<SendingTransfer::Chunk>(transfer.v);
|
||||
|
||||
if (content != t_c.content) {
|
||||
// ignore different content
|
||||
continue;
|
||||
}
|
||||
|
||||
auto chunk_idx_vec = content.get<Components::FT1ChunkSHA1Cache>().chunkIndices(hash);
|
||||
|
||||
for (size_t idx : chunk_idx_vec) {
|
||||
if (idx == t_c.chunk_index) {
|
||||
// already sending
|
||||
return; // skip
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// not in queue yet
|
||||
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f));
|
||||
}
|
||||
@ -131,7 +154,7 @@ uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t pee
|
||||
return (uint64_t(group_number) << 32) | peer_number;
|
||||
}
|
||||
|
||||
void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
|
||||
void SHA1_NGCFT1::updateMessages(ObjectHandle ce) {
|
||||
assert(ce.all_of<Components::Messages>());
|
||||
|
||||
for (auto msg : ce.get<Components::Messages>().messages) {
|
||||
@ -160,7 +183,7 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) {
|
||||
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) {
|
||||
// get a list of peers we can request this file from
|
||||
// TODO: randomly request from non SuspectedParticipants
|
||||
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
|
||||
@ -194,6 +217,10 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(C
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::TagSelfStrong>(child)) {
|
||||
continue; // skip self
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
|
||||
un_tox_peers.push_back(tgpe.peer_number);
|
||||
@ -219,11 +246,13 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(C
|
||||
}
|
||||
|
||||
SHA1_NGCFT1::SHA1_NGCFT1(
|
||||
ObjectStore2& os,
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModel& rmm,
|
||||
NGCFT1& nft,
|
||||
ToxContactModel2& tcm
|
||||
) :
|
||||
_os(os),
|
||||
_cr(cr),
|
||||
_rmm(rmm),
|
||||
_nft(nft),
|
||||
@ -266,8 +295,9 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
|
||||
it->second.time_since_activity += delta;
|
||||
|
||||
// if we have not heard for 10sec, timeout
|
||||
if (it->second.time_since_activity >= 10.f) {
|
||||
// if we have not heard for 2min, timeout (lower level event on real timeout)
|
||||
// TODO: do we really need this if we get events?
|
||||
if (it->second.time_since_activity >= 120.f) {
|
||||
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
|
||||
it = peer_it->second.erase(it);
|
||||
} else {
|
||||
@ -320,8 +350,8 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
}
|
||||
|
||||
{ // requested info timers
|
||||
std::vector<Content> timed_out;
|
||||
_contentr.view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Content e, Components::ReRequestInfoTimer& rrit) {
|
||||
std::vector<Object> timed_out;
|
||||
_os.registry().view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Object e, Components::ReRequestInfoTimer& rrit) {
|
||||
rrit.timer += delta;
|
||||
|
||||
// 15sec, TODO: config
|
||||
@ -331,12 +361,13 @@ void SHA1_NGCFT1::iterate(float delta) {
|
||||
});
|
||||
for (const auto e : timed_out) {
|
||||
// TODO: avoid dups
|
||||
_queue_content_want_info.push_back({_contentr, e});
|
||||
_contentr.remove<Components::ReRequestInfoTimer>(e);
|
||||
_queue_content_want_info.push_back(_os.objectHandle(e));
|
||||
_os.registry().remove<Components::ReRequestInfoTimer>(e);
|
||||
// TODO: throw update?
|
||||
}
|
||||
}
|
||||
{ // requested chunk timers
|
||||
_contentr.view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
||||
it->second += delta;
|
||||
|
||||
@ -522,11 +553,11 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
||||
// first, open file for write(+readback)
|
||||
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path};
|
||||
// TODO: replace with filesystem or something
|
||||
// TODO: ensure dir exists
|
||||
if (full_file_path.back() != '/') {
|
||||
full_file_path += "/";
|
||||
}
|
||||
|
||||
// ensure dir exists
|
||||
std::filesystem::create_directories(full_file_path);
|
||||
|
||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||
@ -534,14 +565,13 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
||||
|
||||
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
|
||||
|
||||
std::unique_ptr<FileRWMapped> file_impl;
|
||||
const bool file_exists = std::filesystem::exists(full_file_path);
|
||||
|
||||
file_impl = std::make_unique<FileRWMapped>(full_file_path, info.file_size);
|
||||
std::unique_ptr<File2I> file_impl = std::make_unique<File2RWMapped>(full_file_path, info.file_size);
|
||||
|
||||
if (!file_impl->isGood()) {
|
||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
|
||||
//e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
|
||||
// we failed opening that filepath, so we should offer the user the oportunity to save it differently
|
||||
e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -557,12 +587,11 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
||||
// iterate existing file
|
||||
for (size_t i = 0; i < info.chunks.size(); i++) {
|
||||
const uint64_t chunk_size = info.chunkSize(i);
|
||||
auto existing_data = file_impl->read(i*uint64_t(info.chunk_size), chunk_size);
|
||||
assert(existing_data.size() == chunk_size);
|
||||
auto existing_data = file_impl->read(chunk_size, i*uint64_t(info.chunk_size));
|
||||
|
||||
// TODO: avoid copy
|
||||
|
||||
const auto data_hash = SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())};
|
||||
assert(existing_data.size == chunk_size);
|
||||
if (existing_data.size == chunk_size) {
|
||||
const auto data_hash = SHA1Digest{hash_sha1(existing_data.ptr, existing_data.size)};
|
||||
const bool data_equal = data_hash == info.chunks.at(i);
|
||||
|
||||
cc.have_chunk.push_back(data_equal);
|
||||
@ -574,6 +603,9 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
||||
} else {
|
||||
//std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n";
|
||||
}
|
||||
} else {
|
||||
// error reading?
|
||||
}
|
||||
|
||||
_chunks[info.chunks[i]] = ce;
|
||||
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
|
||||
@ -801,9 +833,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
|
||||
|
||||
// TODO: avoid temporary copy
|
||||
// TODO: check return
|
||||
if (!file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size})) {
|
||||
if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
|
||||
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
|
||||
}
|
||||
}
|
||||
@ -841,14 +871,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
|
||||
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
|
||||
// TODO: should we really use file?
|
||||
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read((chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset, e.data_size);
|
||||
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(
|
||||
e.data_size,
|
||||
(chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset
|
||||
);
|
||||
|
||||
// TODO: optimize
|
||||
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
|
||||
for (size_t i = 0; i < e.data_size && i < data.size; i++) {
|
||||
e.data[i] = data[i];
|
||||
}
|
||||
|
||||
chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size();
|
||||
chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size;
|
||||
// TODO: add event to propergate to messages
|
||||
//_rmm.throwEventUpdate(transfer); // should we?
|
||||
|
||||
@ -929,10 +962,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
const auto chunk_size = info.chunkSize(chunk_index);
|
||||
assert(offset_into_file+chunk_size <= info.file_size);
|
||||
|
||||
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(offset_into_file, chunk_size);
|
||||
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file);
|
||||
assert(!chunk_data.empty());
|
||||
|
||||
// check hash of chunk
|
||||
auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size());
|
||||
auto got_hash = hash_sha1(chunk_data.ptr, chunk_data.size);
|
||||
if (info.chunks.at(chunk_index) == got_hash) {
|
||||
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
||||
|
||||
@ -954,7 +988,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
// HACK: remap file, to clear ram
|
||||
|
||||
// TODO: error checking
|
||||
ce.get<Message::Components::Transfer::File>() = std::make_unique<FileRWMapped>(
|
||||
ce.get<Message::Components::Transfer::File>() = std::make_unique<File2RWMapped>(
|
||||
ce.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
|
||||
info.file_size
|
||||
);
|
||||
@ -962,7 +996,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
|
||||
// good chunk
|
||||
// TODO: have wasted + metadata
|
||||
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size();
|
||||
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -1042,19 +1076,29 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
|
||||
reg.emplace<Message::Components::Timestamp>(new_msg_e, ts); // reactive?
|
||||
|
||||
reg.emplace<Message::Components::TagUnread>(new_msg_e);
|
||||
|
||||
{ // by whom
|
||||
auto& synced_by = reg.get_or_emplace<Message::Components::SyncedBy>(new_msg_e).list;
|
||||
synced_by.emplace(self_c);
|
||||
reg.get_or_emplace<Message::Components::SyncedBy>(new_msg_e).ts.try_emplace(self_c, ts);
|
||||
}
|
||||
|
||||
{ // we received it, so we have it
|
||||
auto& rb = reg.get_or_emplace<Message::Components::ReceivedBy>(new_msg_e).ts;
|
||||
rb.try_emplace(c, ts);
|
||||
// TODO: how do we handle partial files???
|
||||
// tox ft rn only sets self if the file was received fully
|
||||
rb.try_emplace(self_c, ts);
|
||||
}
|
||||
|
||||
// check if content exists
|
||||
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
|
||||
ContentHandle ce;
|
||||
ObjectHandle ce;
|
||||
if (_info_to_content.count(sha1_info_hash)) {
|
||||
ce = _info_to_content.at(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has existing content\n";
|
||||
} else {
|
||||
ce = {_contentr, _contentr.create()};
|
||||
// TODO: backend
|
||||
ce = {_os.registry(), _os.registry().create()};
|
||||
_info_to_content[sha1_info_hash] = ce;
|
||||
std::cout << "SHA1_NGCFT1: new message has new content\n";
|
||||
|
||||
@ -1142,9 +1186,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
file_name_ = std::string(file_name),
|
||||
file_path_ = std::string(file_path)
|
||||
]() mutable {
|
||||
// TODO: rw?
|
||||
// TODO: memory mapped would be king
|
||||
auto file_impl = std::make_unique<FileRFile>(file_path_);
|
||||
auto file_impl = std::make_unique<File2RWMapped>(file_path_, -1);
|
||||
if (!file_impl->isGood()) {
|
||||
{
|
||||
std::lock_guard l{self->_info_builder_queue_mutex};
|
||||
@ -1163,19 +1205,19 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
FT1InfoSHA1 sha1_info;
|
||||
// build info
|
||||
sha1_info.file_name = file_name_;
|
||||
sha1_info.file_size = file_impl->_file_size;
|
||||
sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details
|
||||
|
||||
{ // build chunks
|
||||
// HACK: load file fully
|
||||
// TODO: the speed is truly horrid
|
||||
const auto file_data = file_impl->read(0, file_impl->_file_size);
|
||||
// ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway
|
||||
const auto file_data = file_impl->read(file_impl->_file_size, 0);
|
||||
size_t i = 0;
|
||||
for (; i + sha1_info.chunk_size < file_data.size(); i += sha1_info.chunk_size) {
|
||||
sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, sha1_info.chunk_size));
|
||||
for (; i + sha1_info.chunk_size < file_data.size; i += sha1_info.chunk_size) {
|
||||
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, sha1_info.chunk_size));
|
||||
}
|
||||
|
||||
if (i < file_data.size()) {
|
||||
sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, file_data.size()-i));
|
||||
if (i < file_data.size) {
|
||||
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, file_data.size-i));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1194,7 +1236,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
]() mutable { //
|
||||
// back on iterate thread
|
||||
|
||||
auto file_impl = std::make_unique<FileRFile>(file_path_);
|
||||
auto file_impl = std::make_unique<File2RWMapped>(file_path_, sha1_info.file_size);
|
||||
if (!file_impl->isGood()) {
|
||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
|
||||
return;
|
||||
@ -1211,7 +1253,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n";
|
||||
|
||||
// check if content exists
|
||||
ContentHandle ce;
|
||||
ObjectHandle ce;
|
||||
if (self->_info_to_content.count(sha1_info_hash)) {
|
||||
ce = self->_info_to_content.at(sha1_info_hash);
|
||||
|
||||
@ -1280,7 +1322,8 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
it = self->_queue_content_want_chunk.erase(it);
|
||||
}
|
||||
} else {
|
||||
ce = {self->_contentr, self->_contentr.create()};
|
||||
// TODO: backend
|
||||
ce = {self->_os.registry(), self->_os.registry().create()};
|
||||
self->_info_to_content[sha1_info_hash] = ce;
|
||||
|
||||
ce.emplace<Components::FT1InfoSHA1>(sha1_info);
|
||||
@ -1325,6 +1368,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
||||
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
||||
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
|
||||
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
|
||||
|
||||
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
|
||||
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
|
||||
@ -1357,10 +1401,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
// TODO: check return
|
||||
self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size());
|
||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||
|
||||
// TODO: generalize?
|
||||
auto& synced_by = reg_ptr->emplace<Message::Components::SyncedBy>(msg_e).list;
|
||||
synced_by.emplace(c_self);
|
||||
} else if (
|
||||
// non online group
|
||||
self->_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
|
||||
@ -1368,12 +1408,11 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
// create msg_id
|
||||
const uint32_t message_id = randombytes_random();
|
||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||
|
||||
// TODO: generalize?
|
||||
auto& synced_by = reg_ptr->emplace<Message::Components::SyncedBy>(msg_e).list;
|
||||
synced_by.emplace(c_self);
|
||||
}
|
||||
|
||||
reg_ptr->get_or_emplace<Message::Components::SyncedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||
reg_ptr->get_or_emplace<Message::Components::ReceivedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||
|
||||
self->_rmm.throwEventConstruct(*reg_ptr, msg_e);
|
||||
|
||||
// TODO: place in iterate?
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
// solanaceae port of sha1 fts for NGCFT1
|
||||
|
||||
#include <solanaceae/object_store/object_store.hpp>
|
||||
#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/message3/registry_message_model.hpp>
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
@ -20,11 +21,9 @@
|
||||
#include <mutex>
|
||||
#include <list>
|
||||
|
||||
enum class Content : uint32_t {};
|
||||
using ContentRegistry = entt::basic_registry<Content>;
|
||||
using ContentHandle = entt::basic_handle<ContentRegistry>;
|
||||
|
||||
class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
ObjectStore2& _os;
|
||||
// TODO: backend abstraction
|
||||
Contact3Registry& _cr;
|
||||
RegistryMessageModel& _rmm;
|
||||
NGCFT1& _nft;
|
||||
@ -32,21 +31,18 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
|
||||
std::minstd_rand _rng {1337*11};
|
||||
|
||||
// registry per group?
|
||||
ContentRegistry _contentr;
|
||||
|
||||
// limit this to each group?
|
||||
entt::dense_map<SHA1Digest, ContentHandle> _info_to_content;
|
||||
entt::dense_map<SHA1Digest, ObjectHandle> _info_to_content;
|
||||
|
||||
// sha1 chunk index
|
||||
// TODO: optimize lookup
|
||||
// TODO: multiple contents. hashes might be unique, but data is not
|
||||
entt::dense_map<SHA1Digest, ContentHandle> _chunks;
|
||||
entt::dense_map<SHA1Digest, ObjectHandle> _chunks;
|
||||
|
||||
// group_number, peer_number, content, chunk_hash, timer
|
||||
std::deque<std::tuple<uint32_t, uint32_t, ContentHandle, SHA1Digest, float>> _queue_requested_chunk;
|
||||
std::deque<std::tuple<uint32_t, uint32_t, ObjectHandle, SHA1Digest, float>> _queue_requested_chunk;
|
||||
//void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
|
||||
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash);
|
||||
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash);
|
||||
|
||||
struct SendingTransfer {
|
||||
struct Info {
|
||||
@ -56,7 +52,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
};
|
||||
|
||||
struct Chunk {
|
||||
ContentHandle content;
|
||||
ObjectHandle content;
|
||||
size_t chunk_index; // <.< remove offset_into_file
|
||||
//uint64_t offset_into_file;
|
||||
// or data?
|
||||
@ -72,14 +68,14 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
|
||||
struct ReceivingTransfer {
|
||||
struct Info {
|
||||
ContentHandle content;
|
||||
ObjectHandle content;
|
||||
// copy of info data
|
||||
// too large?
|
||||
std::vector<uint8_t> info_data;
|
||||
};
|
||||
|
||||
struct Chunk {
|
||||
ContentHandle content;
|
||||
ObjectHandle content;
|
||||
std::vector<size_t> chunk_indices;
|
||||
// or data?
|
||||
// if memmapped, this would be just a pointer
|
||||
@ -93,8 +89,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers;
|
||||
|
||||
// makes request rotate around open content
|
||||
std::deque<ContentHandle> _queue_content_want_info;
|
||||
std::deque<ContentHandle> _queue_content_want_chunk;
|
||||
std::deque<ObjectHandle> _queue_content_want_info;
|
||||
std::deque<ObjectHandle> _queue_content_want_chunk;
|
||||
|
||||
std::atomic_bool _info_builder_dirty {false};
|
||||
std::mutex _info_builder_queue_mutex;
|
||||
@ -108,20 +104,21 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
|
||||
static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number);
|
||||
|
||||
void updateMessages(ContentHandle ce);
|
||||
void updateMessages(ObjectHandle ce);
|
||||
|
||||
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ContentHandle ce);
|
||||
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
|
||||
|
||||
public: // TODO: config
|
||||
bool _udp_only {false};
|
||||
|
||||
size_t _max_concurrent_in {4};
|
||||
size_t _max_concurrent_out {6};
|
||||
size_t _max_concurrent_in {6};
|
||||
size_t _max_concurrent_out {8};
|
||||
// TODO: probably also includes running transfers rn (meh)
|
||||
size_t _max_pending_requests {32}; // per content
|
||||
|
||||
public:
|
||||
SHA1_NGCFT1(
|
||||
ObjectStore2& os,
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModel& rmm,
|
||||
NGCFT1& nft,
|
||||
|
Reference in New Issue
Block a user