37 Commits

Author SHA1 Message Date
ae3dc74933 something here broke it
- accounting for rounded down bytes
2024-06-04 19:37:33 +02:00
0eb30246a8 small refactor and print in flight packages when timing out 2024-05-31 17:03:22 +02:00
c52ac19285 print window on done 2024-05-31 15:36:18 +02:00
1231e792a7 lift reduction increase threshold 2024-05-27 18:07:19 +02:00
319e754aff rework time since reduction to only grow if cca is active, also start warm 2024-05-27 11:59:32 +02:00
a4201f4407 track timepoint of last update 2024-05-27 11:31:36 +02:00
57575330dd port to file2, other minor improvements 2024-05-27 11:20:37 +02:00
eb2a19d8f3 hack replace content with improper use of objectstore 2024-04-29 11:55:11 +02:00
dfcb5dee97 adopt receivedby rename 2024-04-20 15:12:05 +02:00
0d40d1abaa dont request from self 2024-04-15 11:48:17 +02:00
61b667a4aa reserve memory to reduce number of allocations in hotspots
especially on the sender side
2024-03-16 11:30:55 +01:00
c03282eae8 actually fix the timeout for slow connections 2024-03-09 18:06:49 +01:00
5fd1f2ab84 fix missing virtual destructor and scale tranfer timeout with concurency 2024-03-05 16:48:58 +01:00
bccd04316a tweak them numbers again 2024-02-04 20:04:36 +01:00
ccf66fb80c update hex conv 2024-01-13 22:34:42 +01:00
ea032244e7 remote comps 2024-01-12 18:55:41 +01:00
0df0760c06 failing to send is now also a congestion event (hacky and only the first time we send data) 2024-01-11 00:48:57 +01:00
f02b03da7c update to plugin 7 and refactor (should improve speed) 2024-01-07 17:23:06 +01:00
103f36f2d2 update to new ngc_events 2023-12-26 21:16:35 +01:00
ad918a3253 add random cap (1020-1220) and tighten cubic rate limit more 2023-12-15 15:31:32 +01:00
70cea0d219 small fixes 2023-12-13 19:38:55 +01:00
b0e2cab17a limit the amount it can send in a single tick (speed boost :D) 2023-12-13 17:56:56 +01:00
0a53a76eb3 maybe filesystem::path can help us 2023-12-13 16:09:34 +01:00
5995059777 better error log + fix broken accept on file creation error 2023-12-13 15:45:36 +01:00
abf2645099 fix include order 2023-11-12 19:58:57 +01:00
7c16c54649 only decrease window on congestion if prev max window was not yet reached yet 2023-10-16 19:51:56 +02:00
a80e74065c ignore requests for running transfers 2023-10-15 22:02:34 +02:00
77f21f01e9 extend the protocol to support larger data packets and set it to the new tox constants numbers 2023-10-11 03:00:03 +02:00
27fd9e688b unread/read status 2023-09-30 00:27:01 +02:00
f28e79dcbc fix missing include 2023-09-15 20:07:19 +02:00
7af5fda0a6 better filter and cubic fixes 2023-09-08 00:41:25 +02:00
f91780c602 filter simple packet drops by not counting the first 4 packets arriving out of order 2023-09-07 12:26:54 +02:00
1e6929c93b only cound a ce once 2023-09-02 13:28:32 +02:00
81a353570b more tweaking 2023-09-02 02:28:22 +02:00
070585ab3d remeber the first sending transfer that could not send any packets and start there next iterate 2023-09-01 23:20:03 +02:00
ba8befbb2d more fixes 2023-09-01 17:34:05 +02:00
a1a9bf886a make cubic and flow more resilient 2023-09-01 15:51:28 +02:00
15 changed files with 596 additions and 285 deletions

View File

@ -65,5 +65,7 @@ target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
sha1::sha1 sha1::sha1
solanaceae_tox_contacts solanaceae_tox_contacts
solanaceae_message3 solanaceae_message3
solanaceae_object_store
solanaceae_file2
) )

View File

@ -3,8 +3,8 @@
#include <iostream> #include <iostream>
NGCEXTEventProvider::NGCEXTEventProvider(ToxEventProviderI& tep) : _tep(tep) { NGCEXTEventProvider::NGCEXTEventProvider(ToxEventProviderI& tep) : _tep(tep) {
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PACKET); _tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PACKET);
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET); _tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET);
} }
#define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error; } #define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error; }
@ -112,6 +112,8 @@ bool NGCEXTEventProvider::parse_ft1_init_ack(
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
e.transfer_id = data[curser++]; e.transfer_id = data[curser++];
e.max_lossy_data_size = 500-4; // -4 and 500 are hardcoded
return dispatch( return dispatch(
NGCEXT_Event::FT1_INIT_ACK, NGCEXT_Event::FT1_INIT_ACK,
e e
@ -224,6 +226,41 @@ bool NGCEXTEventProvider::parse_ft1_message(
); );
} }
bool NGCEXTEventProvider::parse_ft1_init_ack_v2(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
) {
if (!_private) {
std::cerr << "NGCEXT: ft1_init_ack_v2 cant be public\n";
return false;
}
Events::NGCEXT_ft1_init_ack e;
e.group_number = group_number;
e.peer_number = peer_number;
size_t curser = 0;
// - 1 byte (temporary_file_tf_id)
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
e.transfer_id = data[curser++];
// - 2 byte (max_lossy_data_size)
if ((data_size - curser) >= sizeof(e.max_lossy_data_size)) {
e.max_lossy_data_size = 0;
for (size_t i = 0; i < sizeof(e.max_lossy_data_size); i++, curser++) {
e.max_lossy_data_size |= uint16_t(data[curser]) << (i*8);
}
} else {
e.max_lossy_data_size = 500-4; // default
}
return dispatch(
NGCEXT_Event::FT1_INIT_ACK,
e
);
}
bool NGCEXTEventProvider::handlePacket( bool NGCEXTEventProvider::handlePacket(
const uint32_t group_number, const uint32_t group_number,
const uint32_t peer_number, const uint32_t peer_number,
@ -247,7 +284,8 @@ bool NGCEXTEventProvider::handlePacket(
case NGCEXT_Event::FT1_INIT: case NGCEXT_Event::FT1_INIT:
return parse_ft1_init(group_number, peer_number, data+1, data_size-1, _private); return parse_ft1_init(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_INIT_ACK: case NGCEXT_Event::FT1_INIT_ACK:
return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private); //return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private);
return parse_ft1_init_ack_v2(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_DATA: case NGCEXT_Event::FT1_DATA:
return parse_ft1_data(group_number, peer_number, data+1, data_size-1, _private); return parse_ft1_data(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_DATA_ACK: case NGCEXT_Event::FT1_DATA_ACK:

View File

@ -70,8 +70,6 @@ namespace Events {
// - X bytes (file_kind dependent id, differnt sizes) // - X bytes (file_kind dependent id, differnt sizes)
std::vector<uint8_t> file_id; std::vector<uint8_t> file_id;
// TODO: max supported lossy packet size
}; };
struct NGCEXT_ft1_init_ack { struct NGCEXT_ft1_init_ack {
@ -81,7 +79,8 @@ namespace Events {
// - 1 byte (transfer_id) // - 1 byte (transfer_id)
uint8_t transfer_id; uint8_t transfer_id;
// TODO: max supported lossy packet size // - 2 byte (self_max_lossy_data_size)
uint16_t max_lossy_data_size;
}; };
struct NGCEXT_ft1_data { struct NGCEXT_ft1_data {
@ -163,6 +162,7 @@ enum class NGCEXT_Event : uint8_t {
// acknowlage init (like an accept) // acknowlage init (like an accept)
// like tox ft control continue // like tox ft control continue
// - 1 byte (transfer_id) // - 1 byte (transfer_id)
// - 2 byte (self_max_lossy_data_size) (optional since v2)
FT1_INIT_ACK, FT1_INIT_ACK,
// TODO: init deny, speed up non acceptance // TODO: init deny, speed up non acceptance
@ -263,6 +263,12 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
bool _private bool _private
); );
bool parse_ft1_init_ack_v2(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
);
bool handlePacket( bool handlePacket(
const uint32_t group_number, const uint32_t group_number,
const uint32_t peer_number, const uint32_t peer_number,

View File

@ -38,22 +38,34 @@ struct CCAI {
//static_assert(maximum_segment_size == 574); // mesured in wireshark //static_assert(maximum_segment_size == 574); // mesured in wireshark
// flow control // flow control
float max_byterate_allowed {10*1024*1024}; // 10MiB/s //float max_byterate_allowed {100.f*1024*1024}; // 100MiB/s
float max_byterate_allowed {10.f*1024*1024}; // 10MiB/s
//float max_byterate_allowed {1.f*1024*1024}; // 1MiB/s
//float max_byterate_allowed {0.6f*1024*1024}; // 600KiB/s
//float max_byterate_allowed {0.5f*1024*1024}; // 500KiB/s
//float max_byterate_allowed {0.15f*1024*1024}; // 150KiB/s
//float max_byterate_allowed {0.05f*1024*1024}; // 50KiB/s
public: // api public: // api
CCAI(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {} CCAI(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {}
virtual ~CCAI(void) {}
// returns current rtt/delay
virtual float getCurrentDelay(void) const = 0;
// return the current believed window in bytes of how much data can be inflight, // return the current believed window in bytes of how much data can be inflight,
//virtual float getCWnD(void) const = 0; virtual float getWindow(void) = 0;
// TODO: api for how much data we should send // TODO: api for how much data we should send
// take time since last sent into account // take time since last sent into account
// respect max_byterate_allowed // respect max_byterate_allowed
virtual size_t canSend(void) = 0; virtual int64_t canSend(float time_delta) = 0;
// get the list of timed out seq_ids // get the list of timed out seq_ids
virtual std::vector<SeqIDType> getTimeouts(void) const = 0; virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
virtual int64_t inFlightCount(void) const { return -1; }
public: // callbacks public: // callbacks
// data size is without overhead // data size is without overhead
virtual void onSent(SeqIDType seq, size_t data_size) = 0; virtual void onSent(SeqIDType seq, size_t data_size) = 0;

View File

@ -3,14 +3,25 @@
#include <cmath> #include <cmath>
#include <iostream> #include <iostream>
void CUBIC::updateReductionTimer(float time_delta) {
const auto now {getTimeNow()};
// only keep updating while the cca interaction is not too long ago
if (now - _time_point_last_update <= getCurrentDelay()*4.f) {
_time_since_reduction += time_delta;
}
}
void CUBIC::resetReductionTimer(void) {
_time_since_reduction = 0.f;
}
float CUBIC::getCWnD(void) const { float CUBIC::getCWnD(void) const {
const double K = cbrt( const double K = cbrt(
(_window_max * (1. - BETA)) / SCALING_CONSTANT (_window_max * (1. - BETA)) / SCALING_CONSTANT
); );
const double time_since_reduction = getTimeNow() - _time_point_reduction; const double TK = _time_since_reduction - K;
const double TK = time_since_reduction - K;
const double cwnd = const double cwnd =
SCALING_CONSTANT SCALING_CONSTANT
@ -33,29 +44,70 @@ float CUBIC::getCWnD(void) const {
} }
void CUBIC::onCongestion(void) { void CUBIC::onCongestion(void) {
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()) { // 8 is probably too much (800ms for 100ms rtt)
const auto current_cwnd = getCWnD(); if (_time_since_reduction >= getCurrentDelay()*4.f) {
_time_point_reduction = getTimeNow(); const auto tmp_old_tp = _time_since_reduction;
_window_max = current_cwnd;
std::cout << "CONGESTION! cwnd:" << current_cwnd << "\n"; const auto current_cwnd = getCWnD(); // TODO: remove, only used by logging?
const auto current_wnd = getWindow(); // respects cwnd and fwnd
_bytes_leftover = 0;
resetReductionTimer();
if (current_cwnd < _window_max) {
// congestion before reaching the inflection point (prev window_max).
// reduce to wnd*beta to be fair
_window_max = current_wnd * BETA;
} else {
_window_max = current_wnd;
}
_window_max = std::max(_window_max, 2.0*MAXIMUM_SEGMENT_SIZE);
#if 1
std::cout << "----CONGESTION!"
<< " cwnd:" << current_cwnd
<< " wnd:" << current_wnd
<< " cwnd_max:" << _window_max
<< " pts:" << tmp_old_tp
<< " rtt:" << getCurrentDelay()
<< "\n"
;
#endif
} }
} }
size_t CUBIC::canSend(void) { float CUBIC::getWindow(void) {
const auto fspace_pkgs = FlowOnly::canSend(); return std::min<float>(getCWnD(), FlowOnly::getWindow());
}
int64_t CUBIC::canSend(float time_delta) {
const auto fspace_pkgs = FlowOnly::canSend(time_delta);
updateReductionTimer(time_delta);
if (fspace_pkgs == 0u) { if (fspace_pkgs == 0u) {
return 0u; return 0u;
} }
const int64_t cspace_bytes = getCWnD() - _in_flight_bytes; const auto window = getCWnD();
int64_t cspace_bytes = (window - _in_flight_bytes) + _bytes_leftover;
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) { if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
return 0u; return 0u;
} }
// also limit to max sendrate per tick, which is usually smaller than window
// this is mostly to prevent spikes on empty windows
const auto rate = window / getCurrentDelay();
// we dont want this limit to fall below atleast 1 segment
const int64_t max_bytes_per_tick = std::max<int64_t>(rate * time_delta + 0.5f, MAXIMUM_SEGMENT_SIZE);
cspace_bytes = std::min<int64_t>(cspace_bytes, max_bytes_per_tick);
// limit to whole packets // limit to whole packets
size_t cspace_pkgs = std::floor(cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
_bytes_leftover = cspace_bytes - cspace_pkgs;
return std::min(cspace_pkgs, fspace_pkgs); return std::min(cspace_pkgs, fspace_pkgs);
} }

View File

@ -8,7 +8,8 @@ struct CUBIC : public FlowOnly {
//using clock = std::chrono::steady_clock; //using clock = std::chrono::steady_clock;
public: // config public: // config
static constexpr float BETA {0.7f}; //static constexpr float BETA {0.7f};
static constexpr float BETA {0.8f};
static constexpr float SCALING_CONSTANT {0.4f}; static constexpr float SCALING_CONSTANT {0.4f};
static constexpr float RTT_EMA_ALPHA = 0.1f; // 0.1 is very smooth, might need more static constexpr float RTT_EMA_ALPHA = 0.1f; // 0.1 is very smooth, might need more
@ -16,9 +17,14 @@ struct CUBIC : public FlowOnly {
// window size before last reduciton // window size before last reduciton
double _window_max {2.f * MAXIMUM_SEGMENT_SIZE}; // start with mss*2 double _window_max {2.f * MAXIMUM_SEGMENT_SIZE}; // start with mss*2
//double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE}; //double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE};
double _time_point_reduction {getTimeNow()};
double _time_since_reduction {12.f}; // warm start
int64_t _bytes_leftover {0};
private: private:
void updateReductionTimer(float time_delta);
void resetReductionTimer(void);
float getCWnD(void) const; float getCWnD(void) const;
// moving avg over the last few delay samples // moving avg over the last few delay samples
@ -31,11 +37,14 @@ struct CUBIC : public FlowOnly {
public: // api public: // api
CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {} CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {}
virtual ~CUBIC(void) {}
float getWindow(void) override;
// TODO: api for how much data we should send // TODO: api for how much data we should send
// take time since last sent into account // take time since last sent into account
// respect max_byterate_allowed // respect max_byterate_allowed
size_t canSend(void) override; int64_t canSend(float time_delta) override;
// get the list of timed out seq_ids // get the list of timed out seq_ids
//std::vector<SeqIDType> getTimeouts(void) const override; //std::vector<SeqIDType> getTimeouts(void) const override;

View File

@ -6,10 +6,16 @@
#include <algorithm> #include <algorithm>
float FlowOnly::getCurrentDelay(void) const { float FlowOnly::getCurrentDelay(void) const {
return std::min(_rtt_ema, RTT_MAX); // below 1ms is useless
return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
} }
void FlowOnly::addRTT(float new_delay) { void FlowOnly::addRTT(float new_delay) {
if (new_delay > _rtt_ema * RTT_UP_MAX) {
// too large a jump up, to be taken into account
return;
}
// lerp(new_delay, rtt_ema, 0.1) // lerp(new_delay, rtt_ema, 0.1)
_rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema; _rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema;
} }
@ -23,7 +29,40 @@ void FlowOnly::updateWindow(void) {
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE); _fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
} }
size_t FlowOnly::canSend(void) { void FlowOnly::updateCongestion(void) {
const auto tmp_window = getWindow();
// packet window * 0.3
// but atleast 4
int32_t max_consecutive_events = std::clamp<int32_t>(
(tmp_window/MAXIMUM_SEGMENT_DATA_SIZE) * 0.3f,
4,
50 // limit TODO: fix idle/time starved algo
);
// TODO: magic number
#if 0
std::cout << "NGC_FT1 Flow: pkg out of order"
<< " w:" << tmp_window
<< " pw:" << tmp_window/MAXIMUM_SEGMENT_DATA_SIZE
<< " coe:" << _consecutive_events
<< " mcoe:" << max_consecutive_events
<< "\n";
#endif
if (_consecutive_events > max_consecutive_events) {
//std::cout << "CONGESTION! NGC_FT1 flow: pkg out of order\n";
onCongestion();
// TODO: set _consecutive_events to zero?
}
}
float FlowOnly::getWindow(void) {
updateWindow();
return _fwnd;
}
int64_t FlowOnly::canSend(float time_delta) {
if (_in_flight.empty()) { if (_in_flight.empty()) {
assert(_in_flight_bytes == 0); assert(_in_flight_bytes == 0);
return MAXIMUM_SEGMENT_DATA_SIZE; return MAXIMUM_SEGMENT_DATA_SIZE;
@ -31,16 +70,17 @@ size_t FlowOnly::canSend(void) {
updateWindow(); updateWindow();
const int64_t fspace = _fwnd - _in_flight_bytes; int64_t fspace = _fwnd - _in_flight_bytes;
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) { if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
return 0u; return 0u;
} }
// limit to whole packets // also limit to max sendrate per tick, which is usually smaller than window
size_t space = std::floor(fspace / MAXIMUM_SEGMENT_DATA_SIZE) // this is mostly to prevent spikes on empty windows
* MAXIMUM_SEGMENT_DATA_SIZE; fspace = std::min<int64_t>(fspace, max_byterate_allowed * time_delta + 0.5f);
return space; // limit to whole packets
return (fspace / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
} }
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const { std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
@ -49,7 +89,7 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
// after 3 rtt delay, we trigger timeout // after 3 rtt delay, we trigger timeout
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f; const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
for (const auto& [seq, time_stamp, size] : _in_flight) { for (const auto& [seq, time_stamp, size, _] : _in_flight) {
if (now_adjusted > time_stamp) { if (now_adjusted > time_stamp) {
list.push_back(seq); list.push_back(seq);
} }
@ -58,16 +98,28 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
return list; return list;
} }
int64_t FlowOnly::inFlightCount(void) const {
return _in_flight.size();
}
void FlowOnly::onSent(SeqIDType seq, size_t data_size) { void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
if constexpr (true) { if constexpr (true) {
for (const auto& it : _in_flight) { for (const auto& it : _in_flight) {
assert(std::get<0>(it) != seq); assert(it.id != seq);
} }
} }
_in_flight.push_back({seq, getTimeNow(), data_size + SEGMENT_OVERHEAD}); const auto& new_entry = _in_flight.emplace_back(
_in_flight_bytes += data_size + SEGMENT_OVERHEAD; FlyingBunch{
//_recently_sent_bytes += data_size + SEGMENT_OVERHEAD; seq,
static_cast<float>(getTimeNow()),
data_size + SEGMENT_OVERHEAD,
false
}
);
_in_flight_bytes += new_entry.bytes;
_time_point_last_update = getTimeNow();
} }
void FlowOnly::onAck(std::vector<SeqIDType> seqs) { void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
@ -78,28 +130,31 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
const auto now {getTimeNow()}; const auto now {getTimeNow()};
_time_point_last_update = now;
// first seq in seqs is the actual value, all extra are for redundency // first seq in seqs is the actual value, all extra are for redundency
{ // skip in ack is congestion event { // skip in ack is congestion event
// 1. look at primary ack of packet // 1. look at primary ack of packet
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool { auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool {
return std::get<0>(v) == seq; return v.id == seq;
}); });
if (it != _in_flight.end()) { if (it != _in_flight.end() && !it->ignore) {
if (it != _in_flight.begin()) { // find first non ignore, it should be the expected
auto first_it = std::find_if_not(_in_flight.cbegin(), _in_flight.cend(), [](const auto& v) -> bool { return v.ignore; });
if (first_it != _in_flight.cend() && it != first_it) {
// not next expected seq -> skip detected // not next expected seq -> skip detected
std::cout << "CONGESTION out of order\n"; _consecutive_events++;
onCongestion(); it->ignore = true; // only handle once
//if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
//_recently_lost_data = true; updateCongestion();
//_last_congestion_event = getTimeNow();
//_last_congestion_rtt = getCurrentDelay();
//}
} else { } else {
// only mesure delay, if not a congestion // only mesure delay, if not a congestion
addRTT(now - std::get<1>(*it)); addRTT(now - it->timestamp);
_consecutive_events = 0;
} }
} else { } else { // TOOD: if ! ignore too
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#if 0 #if 0
// assume we got a duplicated packet // assume we got a duplicated packet
@ -111,14 +166,14 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
for (const auto& seq : seqs) { for (const auto& seq : seqs) {
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
return std::get<0>(v) == seq; return v.id == seq;
}); });
if (it == _in_flight.end()) { if (it == _in_flight.end()) {
continue; // not found, ignore continue; // not found, ignore
} else { } else {
//most_recent = std::max(most_recent, std::get<1>(*it)); //most_recent = std::max(most_recent, std::get<1>(*it));
_in_flight_bytes -= std::get<2>(*it); _in_flight_bytes -= it->bytes;
assert(_in_flight_bytes >= 0); assert(_in_flight_bytes >= 0);
//_recently_acked_data += std::get<2>(*it); //_recently_acked_data += std::get<2>(*it);
_in_flight.erase(it); _in_flight.erase(it);
@ -128,8 +183,8 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
void FlowOnly::onLoss(SeqIDType seq, bool discard) { void FlowOnly::onLoss(SeqIDType seq, bool discard) {
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
assert(!std::isnan(std::get<1>(v))); assert(!std::isnan(v.timestamp));
return std::get<0>(v) == seq; return v.id == seq;
}); });
if (it == _in_flight.end()) { if (it == _in_flight.end()) {
@ -137,24 +192,27 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) {
return; // not found, ignore ?? return; // not found, ignore ??
} }
std::cerr << "FLOW loss\n"; //std::cerr << "FLOW loss\n";
// "if data lost is not to be retransmitted" // "if data lost is not to be retransmitted"
if (discard) { if (discard) {
_in_flight_bytes -= std::get<2>(*it); _in_flight_bytes -= it->bytes;
assert(_in_flight_bytes >= 0); assert(_in_flight_bytes >= 0);
_in_flight.erase(it); _in_flight.erase(it);
} else {
// and not take into rtt
it->timestamp = getTimeNow();
it->ignore = true;
} }
// TODO: reset timestamp?
#if 0 // temporarily disable ce for timeout // usually after data arrived out-of-order/duplicate
// at most once per rtt? if (!it->ignore) {
// TODO: use delay at event instead it->ignore = true; // only handle once
if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) { //_consecutive_events++;
_recently_lost_data = true;
_last_congestion_event = getTimeNow(); //updateCongestion();
_last_congestion_rtt = getCurrentDelay(); // this is usually a safe indicator for congestion/maxed connection
onCongestion();
} }
#endif
} }

View File

@ -11,17 +11,10 @@ struct FlowOnly : public CCAI {
using clock = std::chrono::steady_clock; using clock = std::chrono::steady_clock;
public: // config public: // config
static constexpr float RTT_EMA_ALPHA = 0.1f; // might need over time static constexpr float RTT_EMA_ALPHA = 0.001f; // might need change over time
static constexpr float RTT_UP_MAX = 3.0f; // how much larger a delay can be to be taken into account
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
//float max_byterate_allowed {100.f*1024*1024}; // 100MiB/s
float max_byterate_allowed {10.f*1024*1024}; // 10MiB/s
//float max_byterate_allowed {1.f*1024*1024}; // 1MiB/s
//float max_byterate_allowed {0.6f*1024*1024}; // 600KiB/s
//float max_byterate_allowed {0.5f*1024*1024}; // 500KiB/s
//float max_byterate_allowed {0.05f*1024*1024}; // 50KiB/s
//float max_byterate_allowed {0.15f*1024*1024}; // 150KiB/s
protected: protected:
// initialize to low value, will get corrected very fast // initialize to low value, will get corrected very fast
float _fwnd {0.01f * max_byterate_allowed}; // in bytes float _fwnd {0.01f * max_byterate_allowed}; // in bytes
@ -30,11 +23,24 @@ struct FlowOnly : public CCAI {
float _rtt_ema {0.1f}; float _rtt_ema {0.1f};
// list of sequence ids and timestamps of when they where sent (and payload size) // list of sequence ids and timestamps of when they where sent (and payload size)
std::vector<std::tuple<SeqIDType, float, size_t>> _in_flight; struct FlyingBunch {
SeqIDType id;
float timestamp;
size_t bytes;
// set to true if counted as ce or resent due to timeout
bool ignore {false};
};
std::vector<FlyingBunch> _in_flight;
int64_t _in_flight_bytes {0}; int64_t _in_flight_bytes {0};
int32_t _consecutive_events {0};
clock::time_point _time_start_offset; clock::time_point _time_start_offset;
// used to clamp growth rate in the void
double _time_point_last_update {getTimeNow()};
protected: protected:
// make values relative to algo start for readability (and precision) // make values relative to algo start for readability (and precision)
// get timestamp in seconds // get timestamp in seconds
@ -44,7 +50,9 @@ struct FlowOnly : public CCAI {
// moving avg over the last few delay samples // moving avg over the last few delay samples
// VERY sensitive to bundling acks // VERY sensitive to bundling acks
float getCurrentDelay(void) const; float getCurrentDelay(void) const override;
float getWindow(void) override;
void addRTT(float new_delay); void addRTT(float new_delay);
@ -52,17 +60,23 @@ struct FlowOnly : public CCAI {
virtual void onCongestion(void) {}; virtual void onCongestion(void) {};
// internal logic, calls the onCongestion() event
void updateCongestion(void);
public: // api public: // api
FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {} FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {}
virtual ~FlowOnly(void) {}
// TODO: api for how much data we should send // TODO: api for how much data we should send
// take time since last sent into account // take time since last sent into account
// respect max_byterate_allowed // respect max_byterate_allowed
size_t canSend(void) override; int64_t canSend(float time_delta) override;
// get the list of timed out seq_ids // get the list of timed out seq_ids
std::vector<SeqIDType> getTimeouts(void) const override; std::vector<SeqIDType> getTimeouts(void) const override;
int64_t inFlightCount(void) const override;
public: // callbacks public: // callbacks
// data size is without overhead // data size is without overhead
void onSent(SeqIDType seq, size_t data_size) override; void onSent(SeqIDType seq, size_t data_size) override;

View File

@ -6,6 +6,7 @@
#include <deque> #include <deque>
#include <cstdint> #include <cstdint>
#include <cassert> #include <cassert>
#include <tuple>
#include <iomanip> #include <iomanip>
#include <iostream> #include <iostream>
@ -19,7 +20,7 @@ LEDBAT::LEDBAT(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_siz
_time_start_offset = clock::now(); _time_start_offset = clock::now();
} }
size_t LEDBAT::canSend(void) { int64_t LEDBAT::canSend(float time_delta) {
if (_in_flight.empty()) { if (_in_flight.empty()) {
return MAXIMUM_SEGMENT_DATA_SIZE; return MAXIMUM_SEGMENT_DATA_SIZE;
} }
@ -34,9 +35,7 @@ size_t LEDBAT::canSend(void) {
return 0u; return 0u;
} }
size_t space = std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; return std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
return space;
} }
std::vector<LEDBAT::SeqIDType> LEDBAT::getTimeouts(void) const { std::vector<LEDBAT::SeqIDType> LEDBAT::getTimeouts(void) const {

View File

@ -11,7 +11,7 @@
// LEDBAT++: https://www.ietf.org/archive/id/draft-irtf-iccrg-ledbat-plus-plus-01.txt // LEDBAT++: https://www.ietf.org/archive/id/draft-irtf-iccrg-ledbat-plus-plus-01.txt
// LEDBAT++ implementation // LEDBAT++ implementation
struct LEDBAT : public CCAI{ struct LEDBAT : public CCAI {
public: // config public: // config
#if 0 #if 0
using SeqIDType = std::pair<uint8_t, uint16_t>; // tf_id, seq_id using SeqIDType = std::pair<uint8_t, uint16_t>; // tf_id, seq_id
@ -47,21 +47,20 @@ struct LEDBAT : public CCAI{
//static constexpr size_t rtt_buffer_size_max {2000}; //static constexpr size_t rtt_buffer_size_max {2000};
float max_byterate_allowed {10*1024*1024}; // 10MiB/s
public: public:
LEDBAT(size_t maximum_segment_data_size); LEDBAT(size_t maximum_segment_data_size);
virtual ~LEDBAT(void) {}
// return the current believed window in bytes of how much data can be inflight, // return the current believed window in bytes of how much data can be inflight,
// without overstepping the delay requirement // without overstepping the delay requirement
float getCWnD(void) const { float getWindow(void) override {
return _cwnd; return _cwnd;
} }
// TODO: api for how much data we should send // TODO: api for how much data we should send
// take time since last sent into account // take time since last sent into account
// respect max_byterate_allowed // respect max_byterate_allowed
size_t canSend(void) override; int64_t canSend(float time_delta) override;
// get the list of timed out seq_ids // get the list of timed out seq_ids
std::vector<SeqIDType> getTimeouts(void) const override; std::vector<SeqIDType> getTimeouts(void) const override;
@ -86,7 +85,7 @@ struct LEDBAT : public CCAI{
// moving avg over the last few delay samples // moving avg over the last few delay samples
// VERY sensitive to bundling acks // VERY sensitive to bundling acks
float getCurrentDelay(void) const; float getCurrentDelay(void) const override;
void addRTT(float new_delay); void addRTT(float new_delay);

View File

@ -1,9 +1,14 @@
#include "./ngcft1.hpp" #include "./ngcft1.hpp"
#include <solanaceae/toxcore/utils.hpp> #include "./flow_only.hpp"
#include "./cubic.hpp"
#include "./ledbat.hpp"
#include <solanaceae/util/utils.hpp>
#include <sodium.h> #include <sodium.h>
#include <cstdint>
#include <iostream> #include <iostream>
#include <set> #include <set>
#include <algorithm> #include <algorithm>
@ -71,6 +76,12 @@ bool NGCFT1::sendPKG_FT1_INIT_ACK(
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT_ACK)); pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT_ACK));
pkg.push_back(transfer_id); pkg.push_back(transfer_id);
// - 2 bytes max_lossy_data_size
const uint16_t max_lossy_data_size = _t.toxGroupMaxCustomLossyPacketLength() - 4;
for (size_t i = 0; i < sizeof(uint16_t); i++) {
pkg.push_back((max_lossy_data_size>>(i*8)) & 0xff);
}
// lossless // lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
} }
@ -87,6 +98,7 @@ bool NGCFT1::sendPKG_FT1_DATA(
// check header_size+data_size <= max pkg size // check header_size+data_size <= max pkg size
std::vector<uint8_t> pkg; std::vector<uint8_t> pkg;
pkg.reserve(2048); // saves a ton of allocations
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA)); pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA));
pkg.push_back(transfer_id); pkg.push_back(transfer_id);
pkg.push_back(sequence_id & 0xff); pkg.push_back(sequence_id & 0xff);
@ -107,6 +119,7 @@ bool NGCFT1::sendPKG_FT1_DATA_ACK(
const uint16_t* seq_ids, size_t seq_ids_size const uint16_t* seq_ids, size_t seq_ids_size
) { ) {
std::vector<uint8_t> pkg; std::vector<uint8_t> pkg;
pkg.reserve(1+1+2*32); // 32acks in a single pkg should be unlikely
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA_ACK)); pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA_ACK));
pkg.push_back(transfer_id); pkg.push_back(transfer_id);
@ -143,7 +156,7 @@ bool NGCFT1::sendPKG_FT1_MESSAGE(
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK; return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
} }
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set) { void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
auto& tf_opt = peer.send_transfers.at(idx); auto& tf_opt = peer.send_transfers.at(idx);
assert(tf_opt.has_value()); assert(tf_opt.has_value());
auto& tf = tf_opt.value(); auto& tf = tf_opt.value();
@ -175,22 +188,36 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
} }
//break; //break;
return; return;
case State::SENDING: { case State::FINISHING: // we still have unacked packets
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
}
});
if (tf.time_since_activity >= sending_give_up_after) {
// no ack after 30sec, close ft
// TODO: notify app
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) { tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
// no ack after 5 sec -> resend peer.cca->onLoss({idx, id}, true);
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) { timeouts_set.erase({idx, id});
if (timeouts_set.count({idx, id})) {
// TODO: can fail
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
}
}); });
if (tf.time_since_activity >= sending_give_up_after) { tf_opt.reset();
}
break;
case State::SENDING: {
// first handle overall timeout (could otherwise do resends directly before, which is useless)
// timeout increases with active transfers (otherwise we could starve them)
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
// no ack after 30sec, close ft // no ack after 30sec, close ft
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n"; std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n";
dispatch( dispatch(
NGCFT1_Event::send_done, NGCFT1_Event::send_done,
Events::NGCFT1_send_done{ Events::NGCFT1_send_done{
@ -210,22 +237,23 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
return; return;
} }
// do resends
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
// TODO: can fail
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
}
});
// if chunks in flight < window size (2) // if chunks in flight < window size (2)
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
int64_t can_packet_size {static_cast<int64_t>(peer.cca->canSend())};
//if (can_packet_size) {
//std::cerr << "FT: can_packet_size: " << can_packet_size;
//}
size_t count {0};
while (can_packet_size > 0 && tf.file_size > 0) { while (can_packet_size > 0 && tf.file_size > 0) {
std::vector<uint8_t> new_data; std::vector<uint8_t> new_data;
// TODO: parameterize packet size? -> only if JF increases lossy packet size >:)
//size_t chunk_size = std::min<size_t>(496u, tf.file_size - tf.file_size_current);
//size_t chunk_size = std::min<size_t>(can_packet_size, tf.file_size - tf.file_size_current);
size_t chunk_size = std::min<size_t>({ size_t chunk_size = std::min<size_t>({
//496u,
//996u,
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE, peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
static_cast<size_t>(can_packet_size), static_cast<size_t>(can_packet_size),
tf.file_size - tf.file_size_current tf.file_size - tf.file_size_current
@ -237,14 +265,6 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
new_data.resize(chunk_size); new_data.resize(chunk_size);
//ngc_ft1_ctx->cb_send_data[tf.file_kind](
//tox,
//group_number, peer_number,
//idx,
//tf.file_size_current,
//new_data.data(), new_data.size(),
//ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr
//);
assert(idx <= 0xffu); assert(idx <= 0xffu);
// TODO: check return value // TODO: check return value
dispatch( dispatch(
@ -258,45 +278,19 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
); );
uint16_t seq_id = tf.ssb.add(std::move(new_data)); uint16_t seq_id = tf.ssb.add(std::move(new_data));
sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); const bool sent = sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
peer.cca->onSent({idx, seq_id}, chunk_size); if (sent) {
peer.cca->onSent({idx, seq_id}, chunk_size);
#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1 } else {
fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id); std::cerr << "NGCFT1: failed to send packet (queue full?) --------------\n";
#endif peer.cca->onLoss({idx, seq_id}, false); // HACK: fake congestion event
// TODO: onCongestion
can_packet_size = 0;
}
tf.file_size_current += chunk_size; tf.file_size_current += chunk_size;
can_packet_size -= chunk_size; can_packet_size -= chunk_size;
count++;
} }
//if (count) {
//std::cerr << " split over " << count << "\n";
//}
}
break;
case State::FINISHING: // we still have unacked packets
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
// no ack after 5 sec -> resend
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
if (timeouts_set.count({idx, id})) {
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
}
});
if (tf.time_since_activity >= sending_give_up_after) {
// no ack after 30sec, close ft
// TODO: notify app
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca->onLoss({idx, id}, true);
timeouts_set.erase({idx, id});
});
tf_opt.reset();
} }
break; break;
default: // invalid state, delete default: // invalid state, delete
@ -308,12 +302,33 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
} }
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) { void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
auto timeouts = peer.cca->getTimeouts(); if (peer.cca) {
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()}; auto timeouts = peer.cca->getTimeouts();
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) { int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time)
if (peer.send_transfers.at(idx).has_value()) {
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set); // get number current running transfers TODO: improve
peer.active_send_transfers = 0;
for (const auto& it : peer.send_transfers) {
if (it.has_value()) {
peer.active_send_transfers++;
}
}
// change iterate start position to not starve transfers in the back
size_t iterated_count = 0;
bool last_send_found = false;
for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) {
idx = idx % peer.send_transfers.size();
if (peer.send_transfers.at(idx).has_value()) {
if (!last_send_found && can_packet_size <= 0) {
peer.next_send_transfer_send_idx = idx;
last_send_found = true; // only set once
}
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size);
}
} }
} }
@ -333,15 +348,42 @@ NGCFT1::NGCFT1(
_neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK); _neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK);
_neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE); _neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE);
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_PEER_EXIT); _tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT);
} }
void NGCFT1::iterate(float time_delta) { float NGCFT1::iterate(float time_delta) {
bool transfer_in_progress {false};
for (auto& [group_number, group] : groups) { for (auto& [group_number, group] : groups) {
for (auto& [peer_number, peer] : group.peers) { for (auto& [peer_number, peer] : group.peers) {
iteratePeer(time_delta, group_number, peer_number, peer); iteratePeer(time_delta, group_number, peer_number, peer);
// find any active transfer
if (!transfer_in_progress) {
for (const auto& t : peer.send_transfers) {
if (t.has_value()) {
transfer_in_progress = true;
break;
}
}
}
if (!transfer_in_progress) {
for (const auto& t : peer.recv_transfers) {
if (t.has_value()) {
transfer_in_progress = true;
break;
}
}
}
} }
} }
if (transfer_in_progress) {
// ~15ms for up to 1mb/s
// ~5ms for up to 4mb/s
return 0.005f; // 5ms
} else {
return 1.f; // once a sec might be too little
}
} }
void NGCFT1::NGC_FT1_send_request_private( void NGCFT1::NGC_FT1_send_request_private(
@ -487,7 +529,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) { bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
//#if !NDEBUG //#if !NDEBUG
std::cout << "NGCFT1: FT1_INIT_ACK\n"; std::cout << "NGCFT1: FT1_INIT_ACK mds:" << e.max_lossy_data_size << "\n";
//#endif //#endif
// we now should start sending data // we now should start sending data
@ -507,10 +549,28 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
using State = Group::Peer::SendTransfer::State; using State = Group::Peer::SendTransfer::State;
if (transfer.state != State::INIT_SENT) { if (transfer.state != State::INIT_SENT) {
std::cerr << "NGCFT1 error: inti_ack but not in INIT_SENT state\n"; std::cerr << "NGCFT1 error: init_ack but not in INIT_SENT state\n";
return true; return true;
} }
// negotiated packet_data_size
const auto negotiated_packet_data_size = std::min<uint32_t>(e.max_lossy_data_size, _t.toxGroupMaxCustomLossyPacketLength()-4);
// TODO: reset cca with new pkg size
if (!peer.cca) {
// make random max of [1020-1220]
const uint32_t random_max_data_size = (1024-4) + _rng()%201;
const uint32_t randomized_negotiated_packet_data_size = std::min(negotiated_packet_data_size, random_max_data_size);
peer.max_packet_data_size = randomized_negotiated_packet_data_size;
std::cerr << "NGCFT1: creating cca with max:" << peer.max_packet_data_size << "\n";
peer.cca = std::make_unique<CUBIC>(peer.max_packet_data_size);
//peer.cca = std::make_unique<LEDBAT>(peer.max_packet_data_size);
//peer.cca = std::make_unique<FlowOnly>(peer.max_packet_data_size);
//peer.cca->max_byterate_allowed = 1.f *1024*1024;
}
// iterate will now call NGC_FT1_send_data_cb // iterate will now call NGC_FT1_send_data_cb
transfer.state = State::SENDING; transfer.state = State::SENDING;
transfer.time_since_activity = 0.f; transfer.time_since_activity = 0.f;
@ -520,7 +580,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) { bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
#if !NDEBUG #if !NDEBUG
std::cout << "NGCFT1: FT1_DATA\n"; //std::cout << "NGCFT1: FT1_DATA\n";
#endif #endif
if (e.data.empty()) { if (e.data.empty()) {
@ -599,7 +659,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
Group::Peer& peer = groups[e.group_number].peers[e.peer_number]; Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
if (!peer.send_transfers[e.transfer_id].has_value()) { if (!peer.send_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n"; // we delete directly, packets might still be in flight (in practice they are when ce)
//std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
return true; return true;
} }
@ -625,7 +686,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
// delete if all packets acked // delete if all packets acked
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) { if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
std::cout << "NGCFT1: " << int(e.transfer_id) << " done\n"; std::cout << "NGCFT1: " << int(e.transfer_id) << " done. wnd:" << peer.cca->getWindow() << "\n";
dispatch( dispatch(
NGCFT1_Event::send_done, NGCFT1_Event::send_done,
Events::NGCFT1_send_done{ Events::NGCFT1_send_done{
@ -711,7 +772,7 @@ bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
} }
// reset cca // reset cca
peer.cca = std::make_unique<CUBIC>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 peer.cca.reset(); // dont actually reallocate
return false; return false;
} }

View File

@ -2,22 +2,22 @@
// solanaceae port of tox_ngc_ft1 // solanaceae port of tox_ngc_ft1
#include <solanaceae/toxcore/tox_interface.hpp>
#include <solanaceae/toxcore/tox_event_interface.hpp> #include <solanaceae/toxcore/tox_event_interface.hpp>
#include <solanaceae/toxcore/tox_interface.hpp>
#include <solanaceae/ngc_ext/ngcext.hpp> #include <solanaceae/ngc_ext/ngcext.hpp>
#include "./cubic.hpp" #include "./cca.hpp"
//#include "./flow_only.hpp"
//#include "./ledbat.hpp"
#include "./rcv_buf.hpp" #include "./rcv_buf.hpp"
#include "./snd_buf.hpp" #include "./snd_buf.hpp"
#include "./ngcft1_file_kind.hpp" #include "./ngcft1_file_kind.hpp"
#include <cstdint>
#include <map> #include <map>
#include <set> #include <set>
#include <memory> #include <memory>
#include <random>
namespace Events { namespace Events {
@ -133,15 +133,19 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
ToxEventProviderI& _tep; ToxEventProviderI& _tep;
NGCEXTEventProviderI& _neep; NGCEXTEventProviderI& _neep;
std::default_random_engine _rng{std::random_device{}()};
// TODO: config // TODO: config
size_t acks_per_packet {3u}; // 3 size_t acks_per_packet {3u}; // 3
float init_retry_timeout_after {5.f}; // 10sec float init_retry_timeout_after {4.f};
float sending_give_up_after {30.f}; // 30sec float sending_give_up_after {15.f}; // 30sec (per active transfer)
struct Group { struct Group {
struct Peer { struct Peer {
std::unique_ptr<CCAI> cca = std::make_unique<CUBIC>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 uint32_t max_packet_data_size {500-4};
//std::unique_ptr<CCAI> cca = std::make_unique<CUBIC>(max_packet_data_size); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
std::unique_ptr<CCAI> cca;
struct RecvTransfer { struct RecvTransfer {
uint32_t file_kind; uint32_t file_kind;
@ -188,6 +192,9 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
}; };
std::array<std::optional<SendTransfer>, 256> send_transfers; std::array<std::optional<SendTransfer>, 256> send_transfers;
size_t next_send_transfer_idx {0}; // next id will be 0 size_t next_send_transfer_idx {0}; // next id will be 0
size_t next_send_transfer_send_idx {0};
size_t active_send_transfers {0};
}; };
std::map<uint32_t, Peer> peers; std::map<uint32_t, Peer> peers;
}; };
@ -201,7 +208,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size); bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size); bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set); void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer); void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
public: public:
@ -211,7 +218,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
NGCEXTEventProviderI& neep NGCEXTEventProviderI& neep
); );
void iterate(float delta); float iterate(float delta);
public: // ft1 api public: // ft1 api
// TODO: public variant? // TODO: public variant?

View File

@ -1,58 +1,76 @@
#pragma once #pragma once
#include <solanaceae/message3/file.hpp> #include <solanaceae/file/file2.hpp>
#include "./mio.hpp" #include "./mio.hpp"
#include <filesystem> #include <filesystem>
#include <fstream> #include <fstream>
#include <iostream>
#include <cstring> #include <cstring>
struct FileRWMapped : public FileI { struct File2RWMapped : public File2I {
mio::ummap_sink _file_map; mio::ummap_sink _file_map;
// TODO: add truncate support? // TODO: add truncate support?
FileRWMapped(std::string_view file_path, uint64_t file_size) { // TODO: rw always true?
_file_size = file_size; File2RWMapped(std::string_view file_path, int64_t file_size = -1) : File2I(true, true) {
std::filesystem::path native_file_path{file_path};
if (!std::filesystem::exists(file_path)) { if (!std::filesystem::exists(native_file_path)) {
std::ofstream(std::string{file_path}) << '\0'; // force create the file std::ofstream(native_file_path) << '\0'; // force create the file
}
_file_size = std::filesystem::file_size(native_file_path);
if (file_size >= 0 && _file_size != file_size) {
_file_size = file_size;
std::filesystem::resize_file(native_file_path, file_size); // ensure size, usually sparse
} }
std::filesystem::resize_file(file_path, file_size); // ensure size, usually sparse
std::error_code err; std::error_code err;
// sink, is also read // sink, is also read
_file_map.map(std::string{file_path}, 0, file_size, err); _file_map.map(native_file_path.u8string(), 0, _file_size, err);
if (err) { if (err) {
// TODO: errro std::cerr << "FileRWMapped error: mapping file failed " << err << "\n";
return; return;
} }
} }
virtual ~FileRWMapped(void) override {} virtual ~File2RWMapped(void) override {}
bool isGood(void) override { bool isGood(void) override {
return _file_map.is_mapped(); return _file_map.is_mapped();
} }
std::vector<uint8_t> read(uint64_t pos, uint64_t size) override { bool write(const ByteSpan data, int64_t pos = -1) override {
if (pos+size > _file_size) { // TODO: support streaming write
//assert(false && "read past end"); if (pos < 0) {
return {};
}
return {_file_map.data()+pos, _file_map.data()+(pos+size)};
}
bool write(uint64_t pos, const std::vector<uint8_t>& data) override {
if (pos+data.size() > _file_size) {
return false; return false;
} }
std::memcpy(_file_map.data()+pos, data.data(), data.size()); if (data.empty()) {
return true; // false?
}
// file size is fix for mmaped files
if (pos+data.size > _file_size) {
return false;
}
std::memcpy(_file_map.data()+pos, data.ptr, data.size);
return true; return true;
} }
ByteSpanWithOwnership read(uint64_t size, int64_t pos = -1) override {
if (pos+size > _file_size) {
//assert(false && "read past end");
return ByteSpan{};
}
// return non-owning
return ByteSpan{_file_map.data()+pos, size};
}
}; };

View File

@ -1,14 +1,12 @@
#include "./sha1_ngcft1.hpp" #include "./sha1_ngcft1.hpp"
#include <solanaceae/toxcore/utils.hpp> #include <solanaceae/util/utils.hpp>
#include <solanaceae/contact/components.hpp> #include <solanaceae/contact/components.hpp>
#include <solanaceae/tox_contacts/components.hpp> #include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/message3/components.hpp> #include <solanaceae/message3/components.hpp>
#include <solanaceae/tox_messages/components.hpp> #include <solanaceae/tox_messages/components.hpp>
#include <solanaceae/message3/file_r_file.hpp>
#include "./ft1_sha1_info.hpp" #include "./ft1_sha1_info.hpp"
#include "./hash_utils.hpp" #include "./hash_utils.hpp"
@ -26,11 +24,11 @@
namespace Message::Components { namespace Message::Components {
using Content = ContentHandle; using Content = ObjectHandle;
} // Message::Components } // Message::Components
// TODO: rename to content components // TODO: rename to object components
namespace Components { namespace Components {
struct Messages { struct Messages {
@ -112,8 +110,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
} }
} }
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash) { void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) {
// TODO: transfers
for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) { for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) {
// if already in queue // if already in queue
if (i_g == group_number && i_p == peer_number && i_h == hash) { if (i_g == group_number && i_p == peer_number && i_h == hash) {
@ -123,6 +120,32 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe
} }
} }
// check for running transfer
if (_sending_transfers.count(combineIds(group_number, peer_number))) {
for (const auto& [_, transfer] : _sending_transfers.at(combineIds(group_number, peer_number))) {
if (std::holds_alternative<SendingTransfer::Info>(transfer.v)) {
// ignore info
continue;
}
const auto& t_c = std::get<SendingTransfer::Chunk>(transfer.v);
if (content != t_c.content) {
// ignore different content
continue;
}
auto chunk_idx_vec = content.get<Components::FT1ChunkSHA1Cache>().chunkIndices(hash);
for (size_t idx : chunk_idx_vec) {
if (idx == t_c.chunk_index) {
// already sending
return; // skip
}
}
}
}
// not in queue yet // not in queue yet
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f)); _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f));
} }
@ -131,7 +154,7 @@ uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t pee
return (uint64_t(group_number) << 32) | peer_number; return (uint64_t(group_number) << 32) | peer_number;
} }
void SHA1_NGCFT1::updateMessages(ContentHandle ce) { void SHA1_NGCFT1::updateMessages(ObjectHandle ce) {
assert(ce.all_of<Components::Messages>()); assert(ce.all_of<Components::Messages>());
for (auto msg : ce.get<Components::Messages>().messages) { for (auto msg : ce.get<Components::Messages>().messages) {
@ -160,7 +183,7 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
} }
} }
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) { std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) {
// get a list of peers we can request this file from // get a list of peers we can request this file from
// TODO: randomly request from non SuspectedParticipants // TODO: randomly request from non SuspectedParticipants
std::vector<std::pair<uint32_t, uint32_t>> tox_peers; std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
@ -194,6 +217,10 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(C
continue; continue;
} }
if (_cr.all_of<Contact::Components::TagSelfStrong>(child)) {
continue; // skip self
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) { if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child); const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
un_tox_peers.push_back(tgpe.peer_number); un_tox_peers.push_back(tgpe.peer_number);
@ -219,11 +246,13 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(C
} }
SHA1_NGCFT1::SHA1_NGCFT1( SHA1_NGCFT1::SHA1_NGCFT1(
ObjectStore2& os,
Contact3Registry& cr, Contact3Registry& cr,
RegistryMessageModel& rmm, RegistryMessageModel& rmm,
NGCFT1& nft, NGCFT1& nft,
ToxContactModel2& tcm ToxContactModel2& tcm
) : ) :
_os(os),
_cr(cr), _cr(cr),
_rmm(rmm), _rmm(rmm),
_nft(nft), _nft(nft),
@ -266,8 +295,9 @@ void SHA1_NGCFT1::iterate(float delta) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta; it->second.time_since_activity += delta;
// if we have not heard for 10sec, timeout // if we have not heard for 2min, timeout (lower level event on real timeout)
if (it->second.time_since_activity >= 10.f) { // TODO: do we really need this if we get events?
if (it->second.time_since_activity >= 120.f) {
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n"; std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
it = peer_it->second.erase(it); it = peer_it->second.erase(it);
} else { } else {
@ -320,8 +350,8 @@ void SHA1_NGCFT1::iterate(float delta) {
} }
{ // requested info timers { // requested info timers
std::vector<Content> timed_out; std::vector<Object> timed_out;
_contentr.view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Content e, Components::ReRequestInfoTimer& rrit) { _os.registry().view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Object e, Components::ReRequestInfoTimer& rrit) {
rrit.timer += delta; rrit.timer += delta;
// 15sec, TODO: config // 15sec, TODO: config
@ -331,12 +361,13 @@ void SHA1_NGCFT1::iterate(float delta) {
}); });
for (const auto e : timed_out) { for (const auto e : timed_out) {
// TODO: avoid dups // TODO: avoid dups
_queue_content_want_info.push_back({_contentr, e}); _queue_content_want_info.push_back(_os.objectHandle(e));
_contentr.remove<Components::ReRequestInfoTimer>(e); _os.registry().remove<Components::ReRequestInfoTimer>(e);
// TODO: throw update?
} }
} }
{ // requested chunk timers { // requested chunk timers
_contentr.view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) { _os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
it->second += delta; it->second += delta;
@ -522,11 +553,11 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// first, open file for write(+readback) // first, open file for write(+readback)
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path}; std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path};
// TODO: replace with filesystem or something // TODO: replace with filesystem or something
// TODO: ensure dir exists
if (full_file_path.back() != '/') { if (full_file_path.back() != '/') {
full_file_path += "/"; full_file_path += "/";
} }
// ensure dir exists
std::filesystem::create_directories(full_file_path); std::filesystem::create_directories(full_file_path);
const auto& info = ce.get<Components::FT1InfoSHA1>(); const auto& info = ce.get<Components::FT1InfoSHA1>();
@ -534,14 +565,13 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path}); ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
std::unique_ptr<FileRWMapped> file_impl;
const bool file_exists = std::filesystem::exists(full_file_path); const bool file_exists = std::filesystem::exists(full_file_path);
std::unique_ptr<File2I> file_impl = std::make_unique<File2RWMapped>(full_file_path, info.file_size);
file_impl = std::make_unique<FileRWMapped>(full_file_path, info.file_size);
if (!file_impl->isGood()) { if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n"; std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
//e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop // we failed opening that filepath, so we should offer the user the oportunity to save it differently
e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
return false; return false;
} }
@ -557,22 +587,24 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// iterate existing file // iterate existing file
for (size_t i = 0; i < info.chunks.size(); i++) { for (size_t i = 0; i < info.chunks.size(); i++) {
const uint64_t chunk_size = info.chunkSize(i); const uint64_t chunk_size = info.chunkSize(i);
auto existing_data = file_impl->read(i*uint64_t(info.chunk_size), chunk_size); auto existing_data = file_impl->read(chunk_size, i*uint64_t(info.chunk_size));
assert(existing_data.size() == chunk_size);
// TODO: avoid copy assert(existing_data.size == chunk_size);
if (existing_data.size == chunk_size) {
const auto data_hash = SHA1Digest{hash_sha1(existing_data.ptr, existing_data.size)};
const bool data_equal = data_hash == info.chunks.at(i);
const auto data_hash = SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())}; cc.have_chunk.push_back(data_equal);
const bool data_equal = data_hash == info.chunks.at(i);
cc.have_chunk.push_back(data_equal); if (data_equal) {
cc.have_count += 1;
if (data_equal) { bytes_received += chunk_size;
cc.have_count += 1; //std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n";
bytes_received += chunk_size; } else {
//std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n"; //std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n";
}
} else { } else {
//std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n"; // error reading?
} }
_chunks[info.chunks[i]] = ce; _chunks[info.chunks[i]] = ce;
@ -801,9 +833,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) { for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size; const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
// TODO: avoid temporary copy if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
// TODO: check return
if (!file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size})) {
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n"; std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
} }
} }
@ -841,14 +871,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v); auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>(); const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
// TODO: should we really use file? // TODO: should we really use file?
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read((chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset, e.data_size); const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(
e.data_size,
(chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset
);
// TODO: optimize // TODO: optimize
for (size_t i = 0; i < e.data_size && i < data.size(); i++) { for (size_t i = 0; i < e.data_size && i < data.size; i++) {
e.data[i] = data[i]; e.data[i] = data[i];
} }
chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size(); chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size;
// TODO: add event to propergate to messages // TODO: add event to propergate to messages
//_rmm.throwEventUpdate(transfer); // should we? //_rmm.throwEventUpdate(transfer); // should we?
@ -929,10 +962,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
const auto chunk_size = info.chunkSize(chunk_index); const auto chunk_size = info.chunkSize(chunk_index);
assert(offset_into_file+chunk_size <= info.file_size); assert(offset_into_file+chunk_size <= info.file_size);
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(offset_into_file, chunk_size); const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file);
assert(!chunk_data.empty());
// check hash of chunk // check hash of chunk
auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size()); auto got_hash = hash_sha1(chunk_data.ptr, chunk_data.size);
if (info.chunks.at(chunk_index) == got_hash) { if (info.chunks.at(chunk_index) == got_hash) {
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
@ -954,7 +988,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
// HACK: remap file, to clear ram // HACK: remap file, to clear ram
// TODO: error checking // TODO: error checking
ce.get<Message::Components::Transfer::File>() = std::make_unique<FileRWMapped>( ce.get<Message::Components::Transfer::File>() = std::make_unique<File2RWMapped>(
ce.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(), ce.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
info.file_size info.file_size
); );
@ -962,7 +996,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
// good chunk // good chunk
// TODO: have wasted + metadata // TODO: have wasted + metadata
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size(); ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
} }
} }
} else { } else {
@ -1042,19 +1076,29 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0); //reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
reg.emplace<Message::Components::Timestamp>(new_msg_e, ts); // reactive? reg.emplace<Message::Components::Timestamp>(new_msg_e, ts); // reactive?
reg.emplace<Message::Components::TagUnread>(new_msg_e);
{ // by whom { // by whom
auto& synced_by = reg.get_or_emplace<Message::Components::SyncedBy>(new_msg_e).list; reg.get_or_emplace<Message::Components::SyncedBy>(new_msg_e).ts.try_emplace(self_c, ts);
synced_by.emplace(self_c); }
{ // we received it, so we have it
auto& rb = reg.get_or_emplace<Message::Components::ReceivedBy>(new_msg_e).ts;
rb.try_emplace(c, ts);
// TODO: how do we handle partial files???
// tox ft rn only sets self if the file was received fully
rb.try_emplace(self_c, ts);
} }
// check if content exists // check if content exists
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size}; const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
ContentHandle ce; ObjectHandle ce;
if (_info_to_content.count(sha1_info_hash)) { if (_info_to_content.count(sha1_info_hash)) {
ce = _info_to_content.at(sha1_info_hash); ce = _info_to_content.at(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has existing content\n"; std::cout << "SHA1_NGCFT1: new message has existing content\n";
} else { } else {
ce = {_contentr, _contentr.create()}; // TODO: backend
ce = {_os.registry(), _os.registry().create()};
_info_to_content[sha1_info_hash] = ce; _info_to_content[sha1_info_hash] = ce;
std::cout << "SHA1_NGCFT1: new message has new content\n"; std::cout << "SHA1_NGCFT1: new message has new content\n";
@ -1142,9 +1186,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
file_name_ = std::string(file_name), file_name_ = std::string(file_name),
file_path_ = std::string(file_path) file_path_ = std::string(file_path)
]() mutable { ]() mutable {
// TODO: rw? auto file_impl = std::make_unique<File2RWMapped>(file_path_, -1);
// TODO: memory mapped would be king
auto file_impl = std::make_unique<FileRFile>(file_path_);
if (!file_impl->isGood()) { if (!file_impl->isGood()) {
{ {
std::lock_guard l{self->_info_builder_queue_mutex}; std::lock_guard l{self->_info_builder_queue_mutex};
@ -1163,19 +1205,19 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
FT1InfoSHA1 sha1_info; FT1InfoSHA1 sha1_info;
// build info // build info
sha1_info.file_name = file_name_; sha1_info.file_name = file_name_;
sha1_info.file_size = file_impl->_file_size; sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details
{ // build chunks { // build chunks
// HACK: load file fully // HACK: load file fully
// TODO: the speed is truly horrid // ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway
const auto file_data = file_impl->read(0, file_impl->_file_size); const auto file_data = file_impl->read(file_impl->_file_size, 0);
size_t i = 0; size_t i = 0;
for (; i + sha1_info.chunk_size < file_data.size(); i += sha1_info.chunk_size) { for (; i + sha1_info.chunk_size < file_data.size; i += sha1_info.chunk_size) {
sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, sha1_info.chunk_size)); sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, sha1_info.chunk_size));
} }
if (i < file_data.size()) { if (i < file_data.size) {
sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, file_data.size()-i)); sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, file_data.size-i));
} }
} }
@ -1194,7 +1236,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
]() mutable { // ]() mutable { //
// back on iterate thread // back on iterate thread
auto file_impl = std::make_unique<FileRFile>(file_path_); auto file_impl = std::make_unique<File2RWMapped>(file_path_, sha1_info.file_size);
if (!file_impl->isGood()) { if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
return; return;
@ -1211,7 +1253,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n"; std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n";
// check if content exists // check if content exists
ContentHandle ce; ObjectHandle ce;
if (self->_info_to_content.count(sha1_info_hash)) { if (self->_info_to_content.count(sha1_info_hash)) {
ce = self->_info_to_content.at(sha1_info_hash); ce = self->_info_to_content.at(sha1_info_hash);
@ -1280,7 +1322,8 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
it = self->_queue_content_want_chunk.erase(it); it = self->_queue_content_want_chunk.erase(it);
} }
} else { } else {
ce = {self->_contentr, self->_contentr.create()}; // TODO: backend
ce = {self->_os.registry(), self->_os.registry().create()};
self->_info_to_content[sha1_info_hash] = ce; self->_info_to_content[sha1_info_hash] = ce;
ce.emplace<Components::FT1InfoSHA1>(sha1_info); ce.emplace<Components::FT1InfoSHA1>(sha1_info);
@ -1325,6 +1368,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c); reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self); reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive? reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e); reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e); reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
@ -1357,10 +1401,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// TODO: check return // TODO: check return
self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size());
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id); reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
// TODO: generalize?
auto& synced_by = reg_ptr->emplace<Message::Components::SyncedBy>(msg_e).list;
synced_by.emplace(c_self);
} else if ( } else if (
// non online group // non online group
self->_cr.any_of<Contact::Components::ToxGroupPersistent>(c) self->_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
@ -1368,12 +1408,11 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// create msg_id // create msg_id
const uint32_t message_id = randombytes_random(); const uint32_t message_id = randombytes_random();
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id); reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
// TODO: generalize?
auto& synced_by = reg_ptr->emplace<Message::Components::SyncedBy>(msg_e).list;
synced_by.emplace(c_self);
} }
reg_ptr->get_or_emplace<Message::Components::SyncedBy>(msg_e).ts.try_emplace(c_self, ts);
reg_ptr->get_or_emplace<Message::Components::ReceivedBy>(msg_e).ts.try_emplace(c_self, ts);
self->_rmm.throwEventConstruct(*reg_ptr, msg_e); self->_rmm.throwEventConstruct(*reg_ptr, msg_e);
// TODO: place in iterate? // TODO: place in iterate?

View File

@ -2,6 +2,7 @@
// solanaceae port of sha1 fts for NGCFT1 // solanaceae port of sha1 fts for NGCFT1
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/contact/contact_model3.hpp> #include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/message3/registry_message_model.hpp> #include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp> #include <solanaceae/tox_contacts/tox_contact_model2.hpp>
@ -20,11 +21,9 @@
#include <mutex> #include <mutex>
#include <list> #include <list>
enum class Content : uint32_t {};
using ContentRegistry = entt::basic_registry<Content>;
using ContentHandle = entt::basic_handle<ContentRegistry>;
class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
ObjectStore2& _os;
// TODO: backend abstraction
Contact3Registry& _cr; Contact3Registry& _cr;
RegistryMessageModel& _rmm; RegistryMessageModel& _rmm;
NGCFT1& _nft; NGCFT1& _nft;
@ -32,21 +31,18 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
std::minstd_rand _rng {1337*11}; std::minstd_rand _rng {1337*11};
// registry per group?
ContentRegistry _contentr;
// limit this to each group? // limit this to each group?
entt::dense_map<SHA1Digest, ContentHandle> _info_to_content; entt::dense_map<SHA1Digest, ObjectHandle> _info_to_content;
// sha1 chunk index // sha1 chunk index
// TODO: optimize lookup // TODO: optimize lookup
// TODO: multiple contents. hashes might be unique, but data is not // TODO: multiple contents. hashes might be unique, but data is not
entt::dense_map<SHA1Digest, ContentHandle> _chunks; entt::dense_map<SHA1Digest, ObjectHandle> _chunks;
// group_number, peer_number, content, chunk_hash, timer // group_number, peer_number, content, chunk_hash, timer
std::deque<std::tuple<uint32_t, uint32_t, ContentHandle, SHA1Digest, float>> _queue_requested_chunk; std::deque<std::tuple<uint32_t, uint32_t, ObjectHandle, SHA1Digest, float>> _queue_requested_chunk;
//void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash); //void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash); void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash);
struct SendingTransfer { struct SendingTransfer {
struct Info { struct Info {
@ -56,7 +52,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
}; };
struct Chunk { struct Chunk {
ContentHandle content; ObjectHandle content;
size_t chunk_index; // <.< remove offset_into_file size_t chunk_index; // <.< remove offset_into_file
//uint64_t offset_into_file; //uint64_t offset_into_file;
// or data? // or data?
@ -72,14 +68,14 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
struct ReceivingTransfer { struct ReceivingTransfer {
struct Info { struct Info {
ContentHandle content; ObjectHandle content;
// copy of info data // copy of info data
// too large? // too large?
std::vector<uint8_t> info_data; std::vector<uint8_t> info_data;
}; };
struct Chunk { struct Chunk {
ContentHandle content; ObjectHandle content;
std::vector<size_t> chunk_indices; std::vector<size_t> chunk_indices;
// or data? // or data?
// if memmapped, this would be just a pointer // if memmapped, this would be just a pointer
@ -93,8 +89,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers; entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers;
// makes request rotate around open content // makes request rotate around open content
std::deque<ContentHandle> _queue_content_want_info; std::deque<ObjectHandle> _queue_content_want_info;
std::deque<ContentHandle> _queue_content_want_chunk; std::deque<ObjectHandle> _queue_content_want_chunk;
std::atomic_bool _info_builder_dirty {false}; std::atomic_bool _info_builder_dirty {false};
std::mutex _info_builder_queue_mutex; std::mutex _info_builder_queue_mutex;
@ -108,20 +104,21 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number); static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number);
void updateMessages(ContentHandle ce); void updateMessages(ObjectHandle ce);
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ContentHandle ce); std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
public: // TODO: config public: // TODO: config
bool _udp_only {false}; bool _udp_only {false};
size_t _max_concurrent_in {4}; size_t _max_concurrent_in {6};
size_t _max_concurrent_out {6}; size_t _max_concurrent_out {8};
// TODO: probably also includes running transfers rn (meh) // TODO: probably also includes running transfers rn (meh)
size_t _max_pending_requests {32}; // per content size_t _max_pending_requests {32}; // per content
public: public:
SHA1_NGCFT1( SHA1_NGCFT1(
ObjectStore2& os,
Contact3Registry& cr, Contact3Registry& cr,
RegistryMessageModel& rmm, RegistryMessageModel& rmm,
NGCFT1& nft, NGCFT1& nft,