Compare commits
37 Commits
cca_rework
...
broken_som
Author | SHA1 | Date | |
---|---|---|---|
ae3dc74933 | |||
0eb30246a8 | |||
c52ac19285 | |||
1231e792a7 | |||
319e754aff | |||
a4201f4407 | |||
57575330dd | |||
eb2a19d8f3 | |||
dfcb5dee97 | |||
0d40d1abaa | |||
61b667a4aa | |||
c03282eae8 | |||
5fd1f2ab84 | |||
bccd04316a | |||
ccf66fb80c | |||
ea032244e7 | |||
0df0760c06 | |||
f02b03da7c | |||
103f36f2d2 | |||
ad918a3253 | |||
70cea0d219 | |||
b0e2cab17a | |||
0a53a76eb3 | |||
5995059777 | |||
abf2645099 | |||
7c16c54649 | |||
a80e74065c | |||
77f21f01e9 | |||
27fd9e688b | |||
f28e79dcbc | |||
7af5fda0a6 | |||
f91780c602 | |||
1e6929c93b | |||
81a353570b | |||
070585ab3d | |||
ba8befbb2d | |||
a1a9bf886a |
@ -65,5 +65,7 @@ target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
|
|||||||
sha1::sha1
|
sha1::sha1
|
||||||
solanaceae_tox_contacts
|
solanaceae_tox_contacts
|
||||||
solanaceae_message3
|
solanaceae_message3
|
||||||
|
solanaceae_object_store
|
||||||
|
solanaceae_file2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -3,8 +3,8 @@
|
|||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
NGCEXTEventProvider::NGCEXTEventProvider(ToxEventProviderI& tep) : _tep(tep) {
|
NGCEXTEventProvider::NGCEXTEventProvider(ToxEventProviderI& tep) : _tep(tep) {
|
||||||
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PACKET);
|
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PACKET);
|
||||||
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET);
|
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error; }
|
#define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error; }
|
||||||
@ -112,6 +112,8 @@ bool NGCEXTEventProvider::parse_ft1_init_ack(
|
|||||||
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
|
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
|
||||||
e.transfer_id = data[curser++];
|
e.transfer_id = data[curser++];
|
||||||
|
|
||||||
|
e.max_lossy_data_size = 500-4; // -4 and 500 are hardcoded
|
||||||
|
|
||||||
return dispatch(
|
return dispatch(
|
||||||
NGCEXT_Event::FT1_INIT_ACK,
|
NGCEXT_Event::FT1_INIT_ACK,
|
||||||
e
|
e
|
||||||
@ -224,6 +226,41 @@ bool NGCEXTEventProvider::parse_ft1_message(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool NGCEXTEventProvider::parse_ft1_init_ack_v2(
|
||||||
|
uint32_t group_number, uint32_t peer_number,
|
||||||
|
const uint8_t* data, size_t data_size,
|
||||||
|
bool _private
|
||||||
|
) {
|
||||||
|
if (!_private) {
|
||||||
|
std::cerr << "NGCEXT: ft1_init_ack_v2 cant be public\n";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Events::NGCEXT_ft1_init_ack e;
|
||||||
|
e.group_number = group_number;
|
||||||
|
e.peer_number = peer_number;
|
||||||
|
size_t curser = 0;
|
||||||
|
|
||||||
|
// - 1 byte (temporary_file_tf_id)
|
||||||
|
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
|
||||||
|
e.transfer_id = data[curser++];
|
||||||
|
|
||||||
|
// - 2 byte (max_lossy_data_size)
|
||||||
|
if ((data_size - curser) >= sizeof(e.max_lossy_data_size)) {
|
||||||
|
e.max_lossy_data_size = 0;
|
||||||
|
for (size_t i = 0; i < sizeof(e.max_lossy_data_size); i++, curser++) {
|
||||||
|
e.max_lossy_data_size |= uint16_t(data[curser]) << (i*8);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
e.max_lossy_data_size = 500-4; // default
|
||||||
|
}
|
||||||
|
|
||||||
|
return dispatch(
|
||||||
|
NGCEXT_Event::FT1_INIT_ACK,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
bool NGCEXTEventProvider::handlePacket(
|
bool NGCEXTEventProvider::handlePacket(
|
||||||
const uint32_t group_number,
|
const uint32_t group_number,
|
||||||
const uint32_t peer_number,
|
const uint32_t peer_number,
|
||||||
@ -247,7 +284,8 @@ bool NGCEXTEventProvider::handlePacket(
|
|||||||
case NGCEXT_Event::FT1_INIT:
|
case NGCEXT_Event::FT1_INIT:
|
||||||
return parse_ft1_init(group_number, peer_number, data+1, data_size-1, _private);
|
return parse_ft1_init(group_number, peer_number, data+1, data_size-1, _private);
|
||||||
case NGCEXT_Event::FT1_INIT_ACK:
|
case NGCEXT_Event::FT1_INIT_ACK:
|
||||||
return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private);
|
//return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private);
|
||||||
|
return parse_ft1_init_ack_v2(group_number, peer_number, data+1, data_size-1, _private);
|
||||||
case NGCEXT_Event::FT1_DATA:
|
case NGCEXT_Event::FT1_DATA:
|
||||||
return parse_ft1_data(group_number, peer_number, data+1, data_size-1, _private);
|
return parse_ft1_data(group_number, peer_number, data+1, data_size-1, _private);
|
||||||
case NGCEXT_Event::FT1_DATA_ACK:
|
case NGCEXT_Event::FT1_DATA_ACK:
|
||||||
|
@ -70,8 +70,6 @@ namespace Events {
|
|||||||
|
|
||||||
// - X bytes (file_kind dependent id, differnt sizes)
|
// - X bytes (file_kind dependent id, differnt sizes)
|
||||||
std::vector<uint8_t> file_id;
|
std::vector<uint8_t> file_id;
|
||||||
|
|
||||||
// TODO: max supported lossy packet size
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct NGCEXT_ft1_init_ack {
|
struct NGCEXT_ft1_init_ack {
|
||||||
@ -81,7 +79,8 @@ namespace Events {
|
|||||||
// - 1 byte (transfer_id)
|
// - 1 byte (transfer_id)
|
||||||
uint8_t transfer_id;
|
uint8_t transfer_id;
|
||||||
|
|
||||||
// TODO: max supported lossy packet size
|
// - 2 byte (self_max_lossy_data_size)
|
||||||
|
uint16_t max_lossy_data_size;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct NGCEXT_ft1_data {
|
struct NGCEXT_ft1_data {
|
||||||
@ -163,6 +162,7 @@ enum class NGCEXT_Event : uint8_t {
|
|||||||
// acknowlage init (like an accept)
|
// acknowlage init (like an accept)
|
||||||
// like tox ft control continue
|
// like tox ft control continue
|
||||||
// - 1 byte (transfer_id)
|
// - 1 byte (transfer_id)
|
||||||
|
// - 2 byte (self_max_lossy_data_size) (optional since v2)
|
||||||
FT1_INIT_ACK,
|
FT1_INIT_ACK,
|
||||||
|
|
||||||
// TODO: init deny, speed up non acceptance
|
// TODO: init deny, speed up non acceptance
|
||||||
@ -263,6 +263,12 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
|
|||||||
bool _private
|
bool _private
|
||||||
);
|
);
|
||||||
|
|
||||||
|
bool parse_ft1_init_ack_v2(
|
||||||
|
uint32_t group_number, uint32_t peer_number,
|
||||||
|
const uint8_t* data, size_t data_size,
|
||||||
|
bool _private
|
||||||
|
);
|
||||||
|
|
||||||
bool handlePacket(
|
bool handlePacket(
|
||||||
const uint32_t group_number,
|
const uint32_t group_number,
|
||||||
const uint32_t peer_number,
|
const uint32_t peer_number,
|
||||||
|
@ -38,22 +38,34 @@ struct CCAI {
|
|||||||
//static_assert(maximum_segment_size == 574); // mesured in wireshark
|
//static_assert(maximum_segment_size == 574); // mesured in wireshark
|
||||||
|
|
||||||
// flow control
|
// flow control
|
||||||
float max_byterate_allowed {10*1024*1024}; // 10MiB/s
|
//float max_byterate_allowed {100.f*1024*1024}; // 100MiB/s
|
||||||
|
float max_byterate_allowed {10.f*1024*1024}; // 10MiB/s
|
||||||
|
//float max_byterate_allowed {1.f*1024*1024}; // 1MiB/s
|
||||||
|
//float max_byterate_allowed {0.6f*1024*1024}; // 600KiB/s
|
||||||
|
//float max_byterate_allowed {0.5f*1024*1024}; // 500KiB/s
|
||||||
|
//float max_byterate_allowed {0.15f*1024*1024}; // 150KiB/s
|
||||||
|
//float max_byterate_allowed {0.05f*1024*1024}; // 50KiB/s
|
||||||
|
|
||||||
public: // api
|
public: // api
|
||||||
CCAI(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {}
|
CCAI(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {}
|
||||||
|
virtual ~CCAI(void) {}
|
||||||
|
|
||||||
|
// returns current rtt/delay
|
||||||
|
virtual float getCurrentDelay(void) const = 0;
|
||||||
|
|
||||||
// return the current believed window in bytes of how much data can be inflight,
|
// return the current believed window in bytes of how much data can be inflight,
|
||||||
//virtual float getCWnD(void) const = 0;
|
virtual float getWindow(void) = 0;
|
||||||
|
|
||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
virtual size_t canSend(void) = 0;
|
virtual int64_t canSend(float time_delta) = 0;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
|
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
|
||||||
|
|
||||||
|
virtual int64_t inFlightCount(void) const { return -1; }
|
||||||
|
|
||||||
public: // callbacks
|
public: // callbacks
|
||||||
// data size is without overhead
|
// data size is without overhead
|
||||||
virtual void onSent(SeqIDType seq, size_t data_size) = 0;
|
virtual void onSent(SeqIDType seq, size_t data_size) = 0;
|
||||||
|
@ -3,14 +3,25 @@
|
|||||||
#include <cmath>
|
#include <cmath>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
|
void CUBIC::updateReductionTimer(float time_delta) {
|
||||||
|
const auto now {getTimeNow()};
|
||||||
|
|
||||||
|
// only keep updating while the cca interaction is not too long ago
|
||||||
|
if (now - _time_point_last_update <= getCurrentDelay()*4.f) {
|
||||||
|
_time_since_reduction += time_delta;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void CUBIC::resetReductionTimer(void) {
|
||||||
|
_time_since_reduction = 0.f;
|
||||||
|
}
|
||||||
|
|
||||||
float CUBIC::getCWnD(void) const {
|
float CUBIC::getCWnD(void) const {
|
||||||
const double K = cbrt(
|
const double K = cbrt(
|
||||||
(_window_max * (1. - BETA)) / SCALING_CONSTANT
|
(_window_max * (1. - BETA)) / SCALING_CONSTANT
|
||||||
);
|
);
|
||||||
|
|
||||||
const double time_since_reduction = getTimeNow() - _time_point_reduction;
|
const double TK = _time_since_reduction - K;
|
||||||
|
|
||||||
const double TK = time_since_reduction - K;
|
|
||||||
|
|
||||||
const double cwnd =
|
const double cwnd =
|
||||||
SCALING_CONSTANT
|
SCALING_CONSTANT
|
||||||
@ -33,29 +44,70 @@ float CUBIC::getCWnD(void) const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void CUBIC::onCongestion(void) {
|
void CUBIC::onCongestion(void) {
|
||||||
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()) {
|
// 8 is probably too much (800ms for 100ms rtt)
|
||||||
const auto current_cwnd = getCWnD();
|
if (_time_since_reduction >= getCurrentDelay()*4.f) {
|
||||||
_time_point_reduction = getTimeNow();
|
const auto tmp_old_tp = _time_since_reduction;
|
||||||
_window_max = current_cwnd;
|
|
||||||
|
|
||||||
std::cout << "CONGESTION! cwnd:" << current_cwnd << "\n";
|
const auto current_cwnd = getCWnD(); // TODO: remove, only used by logging?
|
||||||
|
const auto current_wnd = getWindow(); // respects cwnd and fwnd
|
||||||
|
|
||||||
|
_bytes_leftover = 0;
|
||||||
|
resetReductionTimer();
|
||||||
|
|
||||||
|
if (current_cwnd < _window_max) {
|
||||||
|
// congestion before reaching the inflection point (prev window_max).
|
||||||
|
// reduce to wnd*beta to be fair
|
||||||
|
_window_max = current_wnd * BETA;
|
||||||
|
} else {
|
||||||
|
_window_max = current_wnd;
|
||||||
|
}
|
||||||
|
|
||||||
|
_window_max = std::max(_window_max, 2.0*MAXIMUM_SEGMENT_SIZE);
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
std::cout << "----CONGESTION!"
|
||||||
|
<< " cwnd:" << current_cwnd
|
||||||
|
<< " wnd:" << current_wnd
|
||||||
|
<< " cwnd_max:" << _window_max
|
||||||
|
<< " pts:" << tmp_old_tp
|
||||||
|
<< " rtt:" << getCurrentDelay()
|
||||||
|
<< "\n"
|
||||||
|
;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t CUBIC::canSend(void) {
|
float CUBIC::getWindow(void) {
|
||||||
const auto fspace_pkgs = FlowOnly::canSend();
|
return std::min<float>(getCWnD(), FlowOnly::getWindow());
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t CUBIC::canSend(float time_delta) {
|
||||||
|
const auto fspace_pkgs = FlowOnly::canSend(time_delta);
|
||||||
|
|
||||||
|
updateReductionTimer(time_delta);
|
||||||
|
|
||||||
if (fspace_pkgs == 0u) {
|
if (fspace_pkgs == 0u) {
|
||||||
return 0u;
|
return 0u;
|
||||||
}
|
}
|
||||||
|
|
||||||
const int64_t cspace_bytes = getCWnD() - _in_flight_bytes;
|
const auto window = getCWnD();
|
||||||
|
int64_t cspace_bytes = (window - _in_flight_bytes) + _bytes_leftover;
|
||||||
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
|
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||||
return 0u;
|
return 0u;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// also limit to max sendrate per tick, which is usually smaller than window
|
||||||
|
// this is mostly to prevent spikes on empty windows
|
||||||
|
const auto rate = window / getCurrentDelay();
|
||||||
|
|
||||||
|
// we dont want this limit to fall below atleast 1 segment
|
||||||
|
const int64_t max_bytes_per_tick = std::max<int64_t>(rate * time_delta + 0.5f, MAXIMUM_SEGMENT_SIZE);
|
||||||
|
cspace_bytes = std::min<int64_t>(cspace_bytes, max_bytes_per_tick);
|
||||||
|
|
||||||
// limit to whole packets
|
// limit to whole packets
|
||||||
size_t cspace_pkgs = std::floor(cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
|
|
||||||
|
_bytes_leftover = cspace_bytes - cspace_pkgs;
|
||||||
|
|
||||||
return std::min(cspace_pkgs, fspace_pkgs);
|
return std::min(cspace_pkgs, fspace_pkgs);
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,8 @@ struct CUBIC : public FlowOnly {
|
|||||||
//using clock = std::chrono::steady_clock;
|
//using clock = std::chrono::steady_clock;
|
||||||
|
|
||||||
public: // config
|
public: // config
|
||||||
static constexpr float BETA {0.7f};
|
//static constexpr float BETA {0.7f};
|
||||||
|
static constexpr float BETA {0.8f};
|
||||||
static constexpr float SCALING_CONSTANT {0.4f};
|
static constexpr float SCALING_CONSTANT {0.4f};
|
||||||
static constexpr float RTT_EMA_ALPHA = 0.1f; // 0.1 is very smooth, might need more
|
static constexpr float RTT_EMA_ALPHA = 0.1f; // 0.1 is very smooth, might need more
|
||||||
|
|
||||||
@ -16,9 +17,14 @@ struct CUBIC : public FlowOnly {
|
|||||||
// window size before last reduciton
|
// window size before last reduciton
|
||||||
double _window_max {2.f * MAXIMUM_SEGMENT_SIZE}; // start with mss*2
|
double _window_max {2.f * MAXIMUM_SEGMENT_SIZE}; // start with mss*2
|
||||||
//double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE};
|
//double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE};
|
||||||
double _time_point_reduction {getTimeNow()};
|
|
||||||
|
double _time_since_reduction {12.f}; // warm start
|
||||||
|
int64_t _bytes_leftover {0};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void updateReductionTimer(float time_delta);
|
||||||
|
void resetReductionTimer(void);
|
||||||
|
|
||||||
float getCWnD(void) const;
|
float getCWnD(void) const;
|
||||||
|
|
||||||
// moving avg over the last few delay samples
|
// moving avg over the last few delay samples
|
||||||
@ -31,11 +37,14 @@ struct CUBIC : public FlowOnly {
|
|||||||
|
|
||||||
public: // api
|
public: // api
|
||||||
CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {}
|
CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {}
|
||||||
|
virtual ~CUBIC(void) {}
|
||||||
|
|
||||||
|
float getWindow(void) override;
|
||||||
|
|
||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
size_t canSend(void) override;
|
int64_t canSend(float time_delta) override;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
//std::vector<SeqIDType> getTimeouts(void) const override;
|
//std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
|
@ -6,10 +6,16 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
float FlowOnly::getCurrentDelay(void) const {
|
float FlowOnly::getCurrentDelay(void) const {
|
||||||
return std::min(_rtt_ema, RTT_MAX);
|
// below 1ms is useless
|
||||||
|
return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
|
||||||
}
|
}
|
||||||
|
|
||||||
void FlowOnly::addRTT(float new_delay) {
|
void FlowOnly::addRTT(float new_delay) {
|
||||||
|
if (new_delay > _rtt_ema * RTT_UP_MAX) {
|
||||||
|
// too large a jump up, to be taken into account
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// lerp(new_delay, rtt_ema, 0.1)
|
// lerp(new_delay, rtt_ema, 0.1)
|
||||||
_rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema;
|
_rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema;
|
||||||
}
|
}
|
||||||
@ -23,7 +29,40 @@ void FlowOnly::updateWindow(void) {
|
|||||||
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
|
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t FlowOnly::canSend(void) {
|
void FlowOnly::updateCongestion(void) {
|
||||||
|
const auto tmp_window = getWindow();
|
||||||
|
// packet window * 0.3
|
||||||
|
// but atleast 4
|
||||||
|
int32_t max_consecutive_events = std::clamp<int32_t>(
|
||||||
|
(tmp_window/MAXIMUM_SEGMENT_DATA_SIZE) * 0.3f,
|
||||||
|
4,
|
||||||
|
50 // limit TODO: fix idle/time starved algo
|
||||||
|
);
|
||||||
|
// TODO: magic number
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
std::cout << "NGC_FT1 Flow: pkg out of order"
|
||||||
|
<< " w:" << tmp_window
|
||||||
|
<< " pw:" << tmp_window/MAXIMUM_SEGMENT_DATA_SIZE
|
||||||
|
<< " coe:" << _consecutive_events
|
||||||
|
<< " mcoe:" << max_consecutive_events
|
||||||
|
<< "\n";
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (_consecutive_events > max_consecutive_events) {
|
||||||
|
//std::cout << "CONGESTION! NGC_FT1 flow: pkg out of order\n";
|
||||||
|
onCongestion();
|
||||||
|
|
||||||
|
// TODO: set _consecutive_events to zero?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
float FlowOnly::getWindow(void) {
|
||||||
|
updateWindow();
|
||||||
|
return _fwnd;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t FlowOnly::canSend(float time_delta) {
|
||||||
if (_in_flight.empty()) {
|
if (_in_flight.empty()) {
|
||||||
assert(_in_flight_bytes == 0);
|
assert(_in_flight_bytes == 0);
|
||||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
@ -31,16 +70,17 @@ size_t FlowOnly::canSend(void) {
|
|||||||
|
|
||||||
updateWindow();
|
updateWindow();
|
||||||
|
|
||||||
const int64_t fspace = _fwnd - _in_flight_bytes;
|
int64_t fspace = _fwnd - _in_flight_bytes;
|
||||||
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
|
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||||
return 0u;
|
return 0u;
|
||||||
}
|
}
|
||||||
|
|
||||||
// limit to whole packets
|
// also limit to max sendrate per tick, which is usually smaller than window
|
||||||
size_t space = std::floor(fspace / MAXIMUM_SEGMENT_DATA_SIZE)
|
// this is mostly to prevent spikes on empty windows
|
||||||
* MAXIMUM_SEGMENT_DATA_SIZE;
|
fspace = std::min<int64_t>(fspace, max_byterate_allowed * time_delta + 0.5f);
|
||||||
|
|
||||||
return space;
|
// limit to whole packets
|
||||||
|
return (fspace / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||||
@ -49,7 +89,7 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
|||||||
// after 3 rtt delay, we trigger timeout
|
// after 3 rtt delay, we trigger timeout
|
||||||
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
|
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
|
||||||
|
|
||||||
for (const auto& [seq, time_stamp, size] : _in_flight) {
|
for (const auto& [seq, time_stamp, size, _] : _in_flight) {
|
||||||
if (now_adjusted > time_stamp) {
|
if (now_adjusted > time_stamp) {
|
||||||
list.push_back(seq);
|
list.push_back(seq);
|
||||||
}
|
}
|
||||||
@ -58,16 +98,28 @@ std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
|||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t FlowOnly::inFlightCount(void) const {
|
||||||
|
return _in_flight.size();
|
||||||
|
}
|
||||||
|
|
||||||
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
|
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
|
||||||
if constexpr (true) {
|
if constexpr (true) {
|
||||||
for (const auto& it : _in_flight) {
|
for (const auto& it : _in_flight) {
|
||||||
assert(std::get<0>(it) != seq);
|
assert(it.id != seq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_in_flight.push_back({seq, getTimeNow(), data_size + SEGMENT_OVERHEAD});
|
const auto& new_entry = _in_flight.emplace_back(
|
||||||
_in_flight_bytes += data_size + SEGMENT_OVERHEAD;
|
FlyingBunch{
|
||||||
//_recently_sent_bytes += data_size + SEGMENT_OVERHEAD;
|
seq,
|
||||||
|
static_cast<float>(getTimeNow()),
|
||||||
|
data_size + SEGMENT_OVERHEAD,
|
||||||
|
false
|
||||||
|
}
|
||||||
|
);
|
||||||
|
_in_flight_bytes += new_entry.bytes;
|
||||||
|
|
||||||
|
_time_point_last_update = getTimeNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||||
@ -78,28 +130,31 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
|||||||
|
|
||||||
const auto now {getTimeNow()};
|
const auto now {getTimeNow()};
|
||||||
|
|
||||||
|
_time_point_last_update = now;
|
||||||
|
|
||||||
// first seq in seqs is the actual value, all extra are for redundency
|
// first seq in seqs is the actual value, all extra are for redundency
|
||||||
{ // skip in ack is congestion event
|
{ // skip in ack is congestion event
|
||||||
// 1. look at primary ack of packet
|
// 1. look at primary ack of packet
|
||||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool {
|
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool {
|
||||||
return std::get<0>(v) == seq;
|
return v.id == seq;
|
||||||
});
|
});
|
||||||
if (it != _in_flight.end()) {
|
if (it != _in_flight.end() && !it->ignore) {
|
||||||
if (it != _in_flight.begin()) {
|
// find first non ignore, it should be the expected
|
||||||
|
auto first_it = std::find_if_not(_in_flight.cbegin(), _in_flight.cend(), [](const auto& v) -> bool { return v.ignore; });
|
||||||
|
|
||||||
|
if (first_it != _in_flight.cend() && it != first_it) {
|
||||||
// not next expected seq -> skip detected
|
// not next expected seq -> skip detected
|
||||||
|
|
||||||
std::cout << "CONGESTION out of order\n";
|
_consecutive_events++;
|
||||||
onCongestion();
|
it->ignore = true; // only handle once
|
||||||
//if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
|
||||||
//_recently_lost_data = true;
|
updateCongestion();
|
||||||
//_last_congestion_event = getTimeNow();
|
|
||||||
//_last_congestion_rtt = getCurrentDelay();
|
|
||||||
//}
|
|
||||||
} else {
|
} else {
|
||||||
// only mesure delay, if not a congestion
|
// only mesure delay, if not a congestion
|
||||||
addRTT(now - std::get<1>(*it));
|
addRTT(now - it->timestamp);
|
||||||
|
_consecutive_events = 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else { // TOOD: if ! ignore too
|
||||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||||
#if 0
|
#if 0
|
||||||
// assume we got a duplicated packet
|
// assume we got a duplicated packet
|
||||||
@ -111,14 +166,14 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
|||||||
|
|
||||||
for (const auto& seq : seqs) {
|
for (const auto& seq : seqs) {
|
||||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||||
return std::get<0>(v) == seq;
|
return v.id == seq;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (it == _in_flight.end()) {
|
if (it == _in_flight.end()) {
|
||||||
continue; // not found, ignore
|
continue; // not found, ignore
|
||||||
} else {
|
} else {
|
||||||
//most_recent = std::max(most_recent, std::get<1>(*it));
|
//most_recent = std::max(most_recent, std::get<1>(*it));
|
||||||
_in_flight_bytes -= std::get<2>(*it);
|
_in_flight_bytes -= it->bytes;
|
||||||
assert(_in_flight_bytes >= 0);
|
assert(_in_flight_bytes >= 0);
|
||||||
//_recently_acked_data += std::get<2>(*it);
|
//_recently_acked_data += std::get<2>(*it);
|
||||||
_in_flight.erase(it);
|
_in_flight.erase(it);
|
||||||
@ -128,8 +183,8 @@ void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
|||||||
|
|
||||||
void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
||||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||||
assert(!std::isnan(std::get<1>(v)));
|
assert(!std::isnan(v.timestamp));
|
||||||
return std::get<0>(v) == seq;
|
return v.id == seq;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (it == _in_flight.end()) {
|
if (it == _in_flight.end()) {
|
||||||
@ -137,24 +192,27 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
|||||||
return; // not found, ignore ??
|
return; // not found, ignore ??
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cerr << "FLOW loss\n";
|
//std::cerr << "FLOW loss\n";
|
||||||
|
|
||||||
// "if data lost is not to be retransmitted"
|
// "if data lost is not to be retransmitted"
|
||||||
if (discard) {
|
if (discard) {
|
||||||
_in_flight_bytes -= std::get<2>(*it);
|
_in_flight_bytes -= it->bytes;
|
||||||
assert(_in_flight_bytes >= 0);
|
assert(_in_flight_bytes >= 0);
|
||||||
_in_flight.erase(it);
|
_in_flight.erase(it);
|
||||||
|
} else {
|
||||||
|
// and not take into rtt
|
||||||
|
it->timestamp = getTimeNow();
|
||||||
|
it->ignore = true;
|
||||||
}
|
}
|
||||||
// TODO: reset timestamp?
|
|
||||||
|
|
||||||
#if 0 // temporarily disable ce for timeout
|
// usually after data arrived out-of-order/duplicate
|
||||||
// at most once per rtt?
|
if (!it->ignore) {
|
||||||
// TODO: use delay at event instead
|
it->ignore = true; // only handle once
|
||||||
if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
//_consecutive_events++;
|
||||||
_recently_lost_data = true;
|
|
||||||
_last_congestion_event = getTimeNow();
|
//updateCongestion();
|
||||||
_last_congestion_rtt = getCurrentDelay();
|
// this is usually a safe indicator for congestion/maxed connection
|
||||||
|
onCongestion();
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,17 +11,10 @@ struct FlowOnly : public CCAI {
|
|||||||
using clock = std::chrono::steady_clock;
|
using clock = std::chrono::steady_clock;
|
||||||
|
|
||||||
public: // config
|
public: // config
|
||||||
static constexpr float RTT_EMA_ALPHA = 0.1f; // might need over time
|
static constexpr float RTT_EMA_ALPHA = 0.001f; // might need change over time
|
||||||
|
static constexpr float RTT_UP_MAX = 3.0f; // how much larger a delay can be to be taken into account
|
||||||
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
|
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
|
||||||
|
|
||||||
//float max_byterate_allowed {100.f*1024*1024}; // 100MiB/s
|
|
||||||
float max_byterate_allowed {10.f*1024*1024}; // 10MiB/s
|
|
||||||
//float max_byterate_allowed {1.f*1024*1024}; // 1MiB/s
|
|
||||||
//float max_byterate_allowed {0.6f*1024*1024}; // 600KiB/s
|
|
||||||
//float max_byterate_allowed {0.5f*1024*1024}; // 500KiB/s
|
|
||||||
//float max_byterate_allowed {0.05f*1024*1024}; // 50KiB/s
|
|
||||||
//float max_byterate_allowed {0.15f*1024*1024}; // 150KiB/s
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// initialize to low value, will get corrected very fast
|
// initialize to low value, will get corrected very fast
|
||||||
float _fwnd {0.01f * max_byterate_allowed}; // in bytes
|
float _fwnd {0.01f * max_byterate_allowed}; // in bytes
|
||||||
@ -30,11 +23,24 @@ struct FlowOnly : public CCAI {
|
|||||||
float _rtt_ema {0.1f};
|
float _rtt_ema {0.1f};
|
||||||
|
|
||||||
// list of sequence ids and timestamps of when they where sent (and payload size)
|
// list of sequence ids and timestamps of when they where sent (and payload size)
|
||||||
std::vector<std::tuple<SeqIDType, float, size_t>> _in_flight;
|
struct FlyingBunch {
|
||||||
|
SeqIDType id;
|
||||||
|
float timestamp;
|
||||||
|
size_t bytes;
|
||||||
|
|
||||||
|
// set to true if counted as ce or resent due to timeout
|
||||||
|
bool ignore {false};
|
||||||
|
};
|
||||||
|
std::vector<FlyingBunch> _in_flight;
|
||||||
int64_t _in_flight_bytes {0};
|
int64_t _in_flight_bytes {0};
|
||||||
|
|
||||||
|
int32_t _consecutive_events {0};
|
||||||
|
|
||||||
clock::time_point _time_start_offset;
|
clock::time_point _time_start_offset;
|
||||||
|
|
||||||
|
// used to clamp growth rate in the void
|
||||||
|
double _time_point_last_update {getTimeNow()};
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// make values relative to algo start for readability (and precision)
|
// make values relative to algo start for readability (and precision)
|
||||||
// get timestamp in seconds
|
// get timestamp in seconds
|
||||||
@ -44,7 +50,9 @@ struct FlowOnly : public CCAI {
|
|||||||
|
|
||||||
// moving avg over the last few delay samples
|
// moving avg over the last few delay samples
|
||||||
// VERY sensitive to bundling acks
|
// VERY sensitive to bundling acks
|
||||||
float getCurrentDelay(void) const;
|
float getCurrentDelay(void) const override;
|
||||||
|
|
||||||
|
float getWindow(void) override;
|
||||||
|
|
||||||
void addRTT(float new_delay);
|
void addRTT(float new_delay);
|
||||||
|
|
||||||
@ -52,17 +60,23 @@ struct FlowOnly : public CCAI {
|
|||||||
|
|
||||||
virtual void onCongestion(void) {};
|
virtual void onCongestion(void) {};
|
||||||
|
|
||||||
|
// internal logic, calls the onCongestion() event
|
||||||
|
void updateCongestion(void);
|
||||||
|
|
||||||
public: // api
|
public: // api
|
||||||
FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {}
|
FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {}
|
||||||
|
virtual ~FlowOnly(void) {}
|
||||||
|
|
||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
size_t canSend(void) override;
|
int64_t canSend(float time_delta) override;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
|
|
||||||
|
int64_t inFlightCount(void) const override;
|
||||||
|
|
||||||
public: // callbacks
|
public: // callbacks
|
||||||
// data size is without overhead
|
// data size is without overhead
|
||||||
void onSent(SeqIDType seq, size_t data_size) override;
|
void onSent(SeqIDType seq, size_t data_size) override;
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
#include <deque>
|
#include <deque>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
#include <tuple>
|
||||||
|
|
||||||
#include <iomanip>
|
#include <iomanip>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
@ -19,7 +20,7 @@ LEDBAT::LEDBAT(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_siz
|
|||||||
_time_start_offset = clock::now();
|
_time_start_offset = clock::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t LEDBAT::canSend(void) {
|
int64_t LEDBAT::canSend(float time_delta) {
|
||||||
if (_in_flight.empty()) {
|
if (_in_flight.empty()) {
|
||||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
}
|
}
|
||||||
@ -34,9 +35,7 @@ size_t LEDBAT::canSend(void) {
|
|||||||
return 0u;
|
return 0u;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t space = std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
return std::ceil(std::min<float>(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
|
|
||||||
return space;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<LEDBAT::SeqIDType> LEDBAT::getTimeouts(void) const {
|
std::vector<LEDBAT::SeqIDType> LEDBAT::getTimeouts(void) const {
|
||||||
|
@ -11,7 +11,7 @@
|
|||||||
// LEDBAT++: https://www.ietf.org/archive/id/draft-irtf-iccrg-ledbat-plus-plus-01.txt
|
// LEDBAT++: https://www.ietf.org/archive/id/draft-irtf-iccrg-ledbat-plus-plus-01.txt
|
||||||
|
|
||||||
// LEDBAT++ implementation
|
// LEDBAT++ implementation
|
||||||
struct LEDBAT : public CCAI{
|
struct LEDBAT : public CCAI {
|
||||||
public: // config
|
public: // config
|
||||||
#if 0
|
#if 0
|
||||||
using SeqIDType = std::pair<uint8_t, uint16_t>; // tf_id, seq_id
|
using SeqIDType = std::pair<uint8_t, uint16_t>; // tf_id, seq_id
|
||||||
@ -47,21 +47,20 @@ struct LEDBAT : public CCAI{
|
|||||||
|
|
||||||
//static constexpr size_t rtt_buffer_size_max {2000};
|
//static constexpr size_t rtt_buffer_size_max {2000};
|
||||||
|
|
||||||
float max_byterate_allowed {10*1024*1024}; // 10MiB/s
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
LEDBAT(size_t maximum_segment_data_size);
|
LEDBAT(size_t maximum_segment_data_size);
|
||||||
|
virtual ~LEDBAT(void) {}
|
||||||
|
|
||||||
// return the current believed window in bytes of how much data can be inflight,
|
// return the current believed window in bytes of how much data can be inflight,
|
||||||
// without overstepping the delay requirement
|
// without overstepping the delay requirement
|
||||||
float getCWnD(void) const {
|
float getWindow(void) override {
|
||||||
return _cwnd;
|
return _cwnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
size_t canSend(void) override;
|
int64_t canSend(float time_delta) override;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
std::vector<SeqIDType> getTimeouts(void) const override;
|
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
@ -86,7 +85,7 @@ struct LEDBAT : public CCAI{
|
|||||||
|
|
||||||
// moving avg over the last few delay samples
|
// moving avg over the last few delay samples
|
||||||
// VERY sensitive to bundling acks
|
// VERY sensitive to bundling acks
|
||||||
float getCurrentDelay(void) const;
|
float getCurrentDelay(void) const override;
|
||||||
|
|
||||||
void addRTT(float new_delay);
|
void addRTT(float new_delay);
|
||||||
|
|
||||||
|
@ -1,9 +1,14 @@
|
|||||||
#include "./ngcft1.hpp"
|
#include "./ngcft1.hpp"
|
||||||
|
|
||||||
#include <solanaceae/toxcore/utils.hpp>
|
#include "./flow_only.hpp"
|
||||||
|
#include "./cubic.hpp"
|
||||||
|
#include "./ledbat.hpp"
|
||||||
|
|
||||||
|
#include <solanaceae/util/utils.hpp>
|
||||||
|
|
||||||
#include <sodium.h>
|
#include <sodium.h>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
@ -71,6 +76,12 @@ bool NGCFT1::sendPKG_FT1_INIT_ACK(
|
|||||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT_ACK));
|
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT_ACK));
|
||||||
pkg.push_back(transfer_id);
|
pkg.push_back(transfer_id);
|
||||||
|
|
||||||
|
// - 2 bytes max_lossy_data_size
|
||||||
|
const uint16_t max_lossy_data_size = _t.toxGroupMaxCustomLossyPacketLength() - 4;
|
||||||
|
for (size_t i = 0; i < sizeof(uint16_t); i++) {
|
||||||
|
pkg.push_back((max_lossy_data_size>>(i*8)) & 0xff);
|
||||||
|
}
|
||||||
|
|
||||||
// lossless
|
// lossless
|
||||||
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
|
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
|
||||||
}
|
}
|
||||||
@ -87,6 +98,7 @@ bool NGCFT1::sendPKG_FT1_DATA(
|
|||||||
// check header_size+data_size <= max pkg size
|
// check header_size+data_size <= max pkg size
|
||||||
|
|
||||||
std::vector<uint8_t> pkg;
|
std::vector<uint8_t> pkg;
|
||||||
|
pkg.reserve(2048); // saves a ton of allocations
|
||||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA));
|
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA));
|
||||||
pkg.push_back(transfer_id);
|
pkg.push_back(transfer_id);
|
||||||
pkg.push_back(sequence_id & 0xff);
|
pkg.push_back(sequence_id & 0xff);
|
||||||
@ -107,6 +119,7 @@ bool NGCFT1::sendPKG_FT1_DATA_ACK(
|
|||||||
const uint16_t* seq_ids, size_t seq_ids_size
|
const uint16_t* seq_ids, size_t seq_ids_size
|
||||||
) {
|
) {
|
||||||
std::vector<uint8_t> pkg;
|
std::vector<uint8_t> pkg;
|
||||||
|
pkg.reserve(1+1+2*32); // 32acks in a single pkg should be unlikely
|
||||||
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA_ACK));
|
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA_ACK));
|
||||||
pkg.push_back(transfer_id);
|
pkg.push_back(transfer_id);
|
||||||
|
|
||||||
@ -143,7 +156,7 @@ bool NGCFT1::sendPKG_FT1_MESSAGE(
|
|||||||
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
|
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set) {
|
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
|
||||||
auto& tf_opt = peer.send_transfers.at(idx);
|
auto& tf_opt = peer.send_transfers.at(idx);
|
||||||
assert(tf_opt.has_value());
|
assert(tf_opt.has_value());
|
||||||
auto& tf = tf_opt.value();
|
auto& tf = tf_opt.value();
|
||||||
@ -175,22 +188,36 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
}
|
}
|
||||||
//break;
|
//break;
|
||||||
return;
|
return;
|
||||||
case State::SENDING: {
|
case State::FINISHING: // we still have unacked packets
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||||
// no ack after 5 sec -> resend
|
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
|
||||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
|
||||||
if (timeouts_set.count({idx, id})) {
|
|
||||||
// TODO: can fail
|
|
||||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||||
peer.cca->onLoss({idx, id}, false);
|
peer.cca->onLoss({idx, id}, false);
|
||||||
time_since_activity = 0.f;
|
time_since_activity = 0.f;
|
||||||
timeouts_set.erase({idx, id});
|
timeouts_set.erase({idx, id});
|
||||||
|
can_packet_size -= data.size();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (tf.time_since_activity >= sending_give_up_after) {
|
if (tf.time_since_activity >= sending_give_up_after) {
|
||||||
// no ack after 30sec, close ft
|
// no ack after 30sec, close ft
|
||||||
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n";
|
// TODO: notify app
|
||||||
|
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
|
||||||
|
|
||||||
|
// clean up cca
|
||||||
|
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||||
|
peer.cca->onLoss({idx, id}, true);
|
||||||
|
timeouts_set.erase({idx, id});
|
||||||
|
});
|
||||||
|
|
||||||
|
tf_opt.reset();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case State::SENDING: {
|
||||||
|
// first handle overall timeout (could otherwise do resends directly before, which is useless)
|
||||||
|
// timeout increases with active transfers (otherwise we could starve them)
|
||||||
|
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
|
||||||
|
// no ack after 30sec, close ft
|
||||||
|
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n";
|
||||||
dispatch(
|
dispatch(
|
||||||
NGCFT1_Event::send_done,
|
NGCFT1_Event::send_done,
|
||||||
Events::NGCFT1_send_done{
|
Events::NGCFT1_send_done{
|
||||||
@ -210,22 +237,23 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do resends
|
||||||
|
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||||
|
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
|
||||||
|
// TODO: can fail
|
||||||
|
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
||||||
|
peer.cca->onLoss({idx, id}, false);
|
||||||
|
time_since_activity = 0.f;
|
||||||
|
timeouts_set.erase({idx, id});
|
||||||
|
can_packet_size -= data.size();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// if chunks in flight < window size (2)
|
// if chunks in flight < window size (2)
|
||||||
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
|
|
||||||
int64_t can_packet_size {static_cast<int64_t>(peer.cca->canSend())};
|
|
||||||
//if (can_packet_size) {
|
|
||||||
//std::cerr << "FT: can_packet_size: " << can_packet_size;
|
|
||||||
//}
|
|
||||||
size_t count {0};
|
|
||||||
while (can_packet_size > 0 && tf.file_size > 0) {
|
while (can_packet_size > 0 && tf.file_size > 0) {
|
||||||
std::vector<uint8_t> new_data;
|
std::vector<uint8_t> new_data;
|
||||||
|
|
||||||
// TODO: parameterize packet size? -> only if JF increases lossy packet size >:)
|
|
||||||
//size_t chunk_size = std::min<size_t>(496u, tf.file_size - tf.file_size_current);
|
|
||||||
//size_t chunk_size = std::min<size_t>(can_packet_size, tf.file_size - tf.file_size_current);
|
|
||||||
size_t chunk_size = std::min<size_t>({
|
size_t chunk_size = std::min<size_t>({
|
||||||
//496u,
|
|
||||||
//996u,
|
|
||||||
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
||||||
static_cast<size_t>(can_packet_size),
|
static_cast<size_t>(can_packet_size),
|
||||||
tf.file_size - tf.file_size_current
|
tf.file_size - tf.file_size_current
|
||||||
@ -237,14 +265,6 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
|
|
||||||
new_data.resize(chunk_size);
|
new_data.resize(chunk_size);
|
||||||
|
|
||||||
//ngc_ft1_ctx->cb_send_data[tf.file_kind](
|
|
||||||
//tox,
|
|
||||||
//group_number, peer_number,
|
|
||||||
//idx,
|
|
||||||
//tf.file_size_current,
|
|
||||||
//new_data.data(), new_data.size(),
|
|
||||||
//ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr
|
|
||||||
//);
|
|
||||||
assert(idx <= 0xffu);
|
assert(idx <= 0xffu);
|
||||||
// TODO: check return value
|
// TODO: check return value
|
||||||
dispatch(
|
dispatch(
|
||||||
@ -258,45 +278,19 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
);
|
);
|
||||||
|
|
||||||
uint16_t seq_id = tf.ssb.add(std::move(new_data));
|
uint16_t seq_id = tf.ssb.add(std::move(new_data));
|
||||||
sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
|
const bool sent = sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
|
||||||
|
if (sent) {
|
||||||
peer.cca->onSent({idx, seq_id}, chunk_size);
|
peer.cca->onSent({idx, seq_id}, chunk_size);
|
||||||
|
} else {
|
||||||
#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1
|
std::cerr << "NGCFT1: failed to send packet (queue full?) --------------\n";
|
||||||
fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id);
|
peer.cca->onLoss({idx, seq_id}, false); // HACK: fake congestion event
|
||||||
#endif
|
// TODO: onCongestion
|
||||||
|
can_packet_size = 0;
|
||||||
|
}
|
||||||
|
|
||||||
tf.file_size_current += chunk_size;
|
tf.file_size_current += chunk_size;
|
||||||
can_packet_size -= chunk_size;
|
can_packet_size -= chunk_size;
|
||||||
count++;
|
|
||||||
}
|
}
|
||||||
//if (count) {
|
|
||||||
//std::cerr << " split over " << count << "\n";
|
|
||||||
//}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case State::FINISHING: // we still have unacked packets
|
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
|
||||||
// no ack after 5 sec -> resend
|
|
||||||
//if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
|
|
||||||
if (timeouts_set.count({idx, id})) {
|
|
||||||
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
|
|
||||||
peer.cca->onLoss({idx, id}, false);
|
|
||||||
time_since_activity = 0.f;
|
|
||||||
timeouts_set.erase({idx, id});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (tf.time_since_activity >= sending_give_up_after) {
|
|
||||||
// no ack after 30sec, close ft
|
|
||||||
// TODO: notify app
|
|
||||||
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
|
|
||||||
|
|
||||||
// clean up cca
|
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
|
||||||
peer.cca->onLoss({idx, id}, true);
|
|
||||||
timeouts_set.erase({idx, id});
|
|
||||||
});
|
|
||||||
|
|
||||||
tf_opt.reset();
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default: // invalid state, delete
|
default: // invalid state, delete
|
||||||
@ -308,12 +302,33 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
}
|
}
|
||||||
|
|
||||||
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
||||||
|
if (peer.cca) {
|
||||||
auto timeouts = peer.cca->getTimeouts();
|
auto timeouts = peer.cca->getTimeouts();
|
||||||
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
||||||
|
|
||||||
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time)
|
||||||
|
|
||||||
|
// get number current running transfers TODO: improve
|
||||||
|
peer.active_send_transfers = 0;
|
||||||
|
for (const auto& it : peer.send_transfers) {
|
||||||
|
if (it.has_value()) {
|
||||||
|
peer.active_send_transfers++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// change iterate start position to not starve transfers in the back
|
||||||
|
size_t iterated_count = 0;
|
||||||
|
bool last_send_found = false;
|
||||||
|
for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) {
|
||||||
|
idx = idx % peer.send_transfers.size();
|
||||||
|
|
||||||
if (peer.send_transfers.at(idx).has_value()) {
|
if (peer.send_transfers.at(idx).has_value()) {
|
||||||
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set);
|
if (!last_send_found && can_packet_size <= 0) {
|
||||||
|
peer.next_send_transfer_send_idx = idx;
|
||||||
|
last_send_found = true; // only set once
|
||||||
|
}
|
||||||
|
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -333,15 +348,42 @@ NGCFT1::NGCFT1(
|
|||||||
_neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK);
|
_neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK);
|
||||||
_neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE);
|
_neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE);
|
||||||
|
|
||||||
_tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_PEER_EXIT);
|
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
void NGCFT1::iterate(float time_delta) {
|
float NGCFT1::iterate(float time_delta) {
|
||||||
|
bool transfer_in_progress {false};
|
||||||
for (auto& [group_number, group] : groups) {
|
for (auto& [group_number, group] : groups) {
|
||||||
for (auto& [peer_number, peer] : group.peers) {
|
for (auto& [peer_number, peer] : group.peers) {
|
||||||
iteratePeer(time_delta, group_number, peer_number, peer);
|
iteratePeer(time_delta, group_number, peer_number, peer);
|
||||||
|
|
||||||
|
// find any active transfer
|
||||||
|
if (!transfer_in_progress) {
|
||||||
|
for (const auto& t : peer.send_transfers) {
|
||||||
|
if (t.has_value()) {
|
||||||
|
transfer_in_progress = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (!transfer_in_progress) {
|
||||||
|
for (const auto& t : peer.recv_transfers) {
|
||||||
|
if (t.has_value()) {
|
||||||
|
transfer_in_progress = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (transfer_in_progress) {
|
||||||
|
// ~15ms for up to 1mb/s
|
||||||
|
// ~5ms for up to 4mb/s
|
||||||
|
return 0.005f; // 5ms
|
||||||
|
} else {
|
||||||
|
return 1.f; // once a sec might be too little
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void NGCFT1::NGC_FT1_send_request_private(
|
void NGCFT1::NGC_FT1_send_request_private(
|
||||||
@ -487,7 +529,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
|
|||||||
|
|
||||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
||||||
//#if !NDEBUG
|
//#if !NDEBUG
|
||||||
std::cout << "NGCFT1: FT1_INIT_ACK\n";
|
std::cout << "NGCFT1: FT1_INIT_ACK mds:" << e.max_lossy_data_size << "\n";
|
||||||
//#endif
|
//#endif
|
||||||
|
|
||||||
// we now should start sending data
|
// we now should start sending data
|
||||||
@ -507,10 +549,28 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
|||||||
|
|
||||||
using State = Group::Peer::SendTransfer::State;
|
using State = Group::Peer::SendTransfer::State;
|
||||||
if (transfer.state != State::INIT_SENT) {
|
if (transfer.state != State::INIT_SENT) {
|
||||||
std::cerr << "NGCFT1 error: inti_ack but not in INIT_SENT state\n";
|
std::cerr << "NGCFT1 error: init_ack but not in INIT_SENT state\n";
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// negotiated packet_data_size
|
||||||
|
const auto negotiated_packet_data_size = std::min<uint32_t>(e.max_lossy_data_size, _t.toxGroupMaxCustomLossyPacketLength()-4);
|
||||||
|
// TODO: reset cca with new pkg size
|
||||||
|
if (!peer.cca) {
|
||||||
|
// make random max of [1020-1220]
|
||||||
|
const uint32_t random_max_data_size = (1024-4) + _rng()%201;
|
||||||
|
const uint32_t randomized_negotiated_packet_data_size = std::min(negotiated_packet_data_size, random_max_data_size);
|
||||||
|
|
||||||
|
peer.max_packet_data_size = randomized_negotiated_packet_data_size;
|
||||||
|
|
||||||
|
std::cerr << "NGCFT1: creating cca with max:" << peer.max_packet_data_size << "\n";
|
||||||
|
|
||||||
|
peer.cca = std::make_unique<CUBIC>(peer.max_packet_data_size);
|
||||||
|
//peer.cca = std::make_unique<LEDBAT>(peer.max_packet_data_size);
|
||||||
|
//peer.cca = std::make_unique<FlowOnly>(peer.max_packet_data_size);
|
||||||
|
//peer.cca->max_byterate_allowed = 1.f *1024*1024;
|
||||||
|
}
|
||||||
|
|
||||||
// iterate will now call NGC_FT1_send_data_cb
|
// iterate will now call NGC_FT1_send_data_cb
|
||||||
transfer.state = State::SENDING;
|
transfer.state = State::SENDING;
|
||||||
transfer.time_since_activity = 0.f;
|
transfer.time_since_activity = 0.f;
|
||||||
@ -520,7 +580,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
|||||||
|
|
||||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
||||||
#if !NDEBUG
|
#if !NDEBUG
|
||||||
std::cout << "NGCFT1: FT1_DATA\n";
|
//std::cout << "NGCFT1: FT1_DATA\n";
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (e.data.empty()) {
|
if (e.data.empty()) {
|
||||||
@ -599,7 +659,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
|||||||
|
|
||||||
Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
|
Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
|
||||||
if (!peer.send_transfers[e.transfer_id].has_value()) {
|
if (!peer.send_transfers[e.transfer_id].has_value()) {
|
||||||
std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
|
// we delete directly, packets might still be in flight (in practice they are when ce)
|
||||||
|
//std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -625,7 +686,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
|||||||
|
|
||||||
// delete if all packets acked
|
// delete if all packets acked
|
||||||
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
|
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
|
||||||
std::cout << "NGCFT1: " << int(e.transfer_id) << " done\n";
|
std::cout << "NGCFT1: " << int(e.transfer_id) << " done. wnd:" << peer.cca->getWindow() << "\n";
|
||||||
dispatch(
|
dispatch(
|
||||||
NGCFT1_Event::send_done,
|
NGCFT1_Event::send_done,
|
||||||
Events::NGCFT1_send_done{
|
Events::NGCFT1_send_done{
|
||||||
@ -711,7 +772,7 @@ bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// reset cca
|
// reset cca
|
||||||
peer.cca = std::make_unique<CUBIC>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
peer.cca.reset(); // dont actually reallocate
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2,22 +2,22 @@
|
|||||||
|
|
||||||
// solanaceae port of tox_ngc_ft1
|
// solanaceae port of tox_ngc_ft1
|
||||||
|
|
||||||
#include <solanaceae/toxcore/tox_interface.hpp>
|
|
||||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||||
|
#include <solanaceae/toxcore/tox_interface.hpp>
|
||||||
|
|
||||||
#include <solanaceae/ngc_ext/ngcext.hpp>
|
#include <solanaceae/ngc_ext/ngcext.hpp>
|
||||||
#include "./cubic.hpp"
|
#include "./cca.hpp"
|
||||||
//#include "./flow_only.hpp"
|
|
||||||
//#include "./ledbat.hpp"
|
|
||||||
|
|
||||||
#include "./rcv_buf.hpp"
|
#include "./rcv_buf.hpp"
|
||||||
#include "./snd_buf.hpp"
|
#include "./snd_buf.hpp"
|
||||||
|
|
||||||
#include "./ngcft1_file_kind.hpp"
|
#include "./ngcft1_file_kind.hpp"
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <random>
|
||||||
|
|
||||||
namespace Events {
|
namespace Events {
|
||||||
|
|
||||||
@ -133,15 +133,19 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
|||||||
ToxEventProviderI& _tep;
|
ToxEventProviderI& _tep;
|
||||||
NGCEXTEventProviderI& _neep;
|
NGCEXTEventProviderI& _neep;
|
||||||
|
|
||||||
|
std::default_random_engine _rng{std::random_device{}()};
|
||||||
|
|
||||||
// TODO: config
|
// TODO: config
|
||||||
size_t acks_per_packet {3u}; // 3
|
size_t acks_per_packet {3u}; // 3
|
||||||
float init_retry_timeout_after {5.f}; // 10sec
|
float init_retry_timeout_after {4.f};
|
||||||
float sending_give_up_after {30.f}; // 30sec
|
float sending_give_up_after {15.f}; // 30sec (per active transfer)
|
||||||
|
|
||||||
|
|
||||||
struct Group {
|
struct Group {
|
||||||
struct Peer {
|
struct Peer {
|
||||||
std::unique_ptr<CCAI> cca = std::make_unique<CUBIC>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
uint32_t max_packet_data_size {500-4};
|
||||||
|
//std::unique_ptr<CCAI> cca = std::make_unique<CUBIC>(max_packet_data_size); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||||
|
std::unique_ptr<CCAI> cca;
|
||||||
|
|
||||||
struct RecvTransfer {
|
struct RecvTransfer {
|
||||||
uint32_t file_kind;
|
uint32_t file_kind;
|
||||||
@ -188,6 +192,9 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
|||||||
};
|
};
|
||||||
std::array<std::optional<SendTransfer>, 256> send_transfers;
|
std::array<std::optional<SendTransfer>, 256> send_transfers;
|
||||||
size_t next_send_transfer_idx {0}; // next id will be 0
|
size_t next_send_transfer_idx {0}; // next id will be 0
|
||||||
|
size_t next_send_transfer_send_idx {0};
|
||||||
|
|
||||||
|
size_t active_send_transfers {0};
|
||||||
};
|
};
|
||||||
std::map<uint32_t, Peer> peers;
|
std::map<uint32_t, Peer> peers;
|
||||||
};
|
};
|
||||||
@ -201,7 +208,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
|||||||
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
||||||
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
||||||
|
|
||||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set);
|
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
|
||||||
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -211,7 +218,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
|||||||
NGCEXTEventProviderI& neep
|
NGCEXTEventProviderI& neep
|
||||||
);
|
);
|
||||||
|
|
||||||
void iterate(float delta);
|
float iterate(float delta);
|
||||||
|
|
||||||
public: // ft1 api
|
public: // ft1 api
|
||||||
// TODO: public variant?
|
// TODO: public variant?
|
||||||
|
@ -1,58 +1,76 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <solanaceae/message3/file.hpp>
|
#include <solanaceae/file/file2.hpp>
|
||||||
|
|
||||||
#include "./mio.hpp"
|
#include "./mio.hpp"
|
||||||
|
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
#include <iostream>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
|
||||||
struct FileRWMapped : public FileI {
|
struct File2RWMapped : public File2I {
|
||||||
mio::ummap_sink _file_map;
|
mio::ummap_sink _file_map;
|
||||||
|
|
||||||
// TODO: add truncate support?
|
// TODO: add truncate support?
|
||||||
FileRWMapped(std::string_view file_path, uint64_t file_size) {
|
// TODO: rw always true?
|
||||||
_file_size = file_size;
|
File2RWMapped(std::string_view file_path, int64_t file_size = -1) : File2I(true, true) {
|
||||||
|
std::filesystem::path native_file_path{file_path};
|
||||||
|
|
||||||
if (!std::filesystem::exists(file_path)) {
|
if (!std::filesystem::exists(native_file_path)) {
|
||||||
std::ofstream(std::string{file_path}) << '\0'; // force create the file
|
std::ofstream(native_file_path) << '\0'; // force create the file
|
||||||
|
}
|
||||||
|
|
||||||
|
_file_size = std::filesystem::file_size(native_file_path);
|
||||||
|
if (file_size >= 0 && _file_size != file_size) {
|
||||||
|
_file_size = file_size;
|
||||||
|
std::filesystem::resize_file(native_file_path, file_size); // ensure size, usually sparse
|
||||||
}
|
}
|
||||||
std::filesystem::resize_file(file_path, file_size); // ensure size, usually sparse
|
|
||||||
|
|
||||||
std::error_code err;
|
std::error_code err;
|
||||||
// sink, is also read
|
// sink, is also read
|
||||||
_file_map.map(std::string{file_path}, 0, file_size, err);
|
_file_map.map(native_file_path.u8string(), 0, _file_size, err);
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
// TODO: errro
|
std::cerr << "FileRWMapped error: mapping file failed " << err << "\n";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual ~FileRWMapped(void) override {}
|
virtual ~File2RWMapped(void) override {}
|
||||||
|
|
||||||
bool isGood(void) override {
|
bool isGood(void) override {
|
||||||
return _file_map.is_mapped();
|
return _file_map.is_mapped();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<uint8_t> read(uint64_t pos, uint64_t size) override {
|
bool write(const ByteSpan data, int64_t pos = -1) override {
|
||||||
if (pos+size > _file_size) {
|
// TODO: support streaming write
|
||||||
//assert(false && "read past end");
|
if (pos < 0) {
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
return {_file_map.data()+pos, _file_map.data()+(pos+size)};
|
|
||||||
}
|
|
||||||
|
|
||||||
bool write(uint64_t pos, const std::vector<uint8_t>& data) override {
|
|
||||||
if (pos+data.size() > _file_size) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::memcpy(_file_map.data()+pos, data.data(), data.size());
|
if (data.empty()) {
|
||||||
|
return true; // false?
|
||||||
|
}
|
||||||
|
|
||||||
|
// file size is fix for mmaped files
|
||||||
|
if (pos+data.size > _file_size) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::memcpy(_file_map.data()+pos, data.ptr, data.size);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ByteSpanWithOwnership read(uint64_t size, int64_t pos = -1) override {
|
||||||
|
if (pos+size > _file_size) {
|
||||||
|
//assert(false && "read past end");
|
||||||
|
return ByteSpan{};
|
||||||
|
}
|
||||||
|
|
||||||
|
// return non-owning
|
||||||
|
return ByteSpan{_file_map.data()+pos, size};
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1,14 +1,12 @@
|
|||||||
#include "./sha1_ngcft1.hpp"
|
#include "./sha1_ngcft1.hpp"
|
||||||
|
|
||||||
#include <solanaceae/toxcore/utils.hpp>
|
#include <solanaceae/util/utils.hpp>
|
||||||
|
|
||||||
#include <solanaceae/contact/components.hpp>
|
#include <solanaceae/contact/components.hpp>
|
||||||
#include <solanaceae/tox_contacts/components.hpp>
|
#include <solanaceae/tox_contacts/components.hpp>
|
||||||
#include <solanaceae/message3/components.hpp>
|
#include <solanaceae/message3/components.hpp>
|
||||||
#include <solanaceae/tox_messages/components.hpp>
|
#include <solanaceae/tox_messages/components.hpp>
|
||||||
|
|
||||||
#include <solanaceae/message3/file_r_file.hpp>
|
|
||||||
|
|
||||||
#include "./ft1_sha1_info.hpp"
|
#include "./ft1_sha1_info.hpp"
|
||||||
#include "./hash_utils.hpp"
|
#include "./hash_utils.hpp"
|
||||||
|
|
||||||
@ -26,11 +24,11 @@
|
|||||||
|
|
||||||
namespace Message::Components {
|
namespace Message::Components {
|
||||||
|
|
||||||
using Content = ContentHandle;
|
using Content = ObjectHandle;
|
||||||
|
|
||||||
} // Message::Components
|
} // Message::Components
|
||||||
|
|
||||||
// TODO: rename to content components
|
// TODO: rename to object components
|
||||||
namespace Components {
|
namespace Components {
|
||||||
|
|
||||||
struct Messages {
|
struct Messages {
|
||||||
@ -112,8 +110,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash) {
|
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) {
|
||||||
// TODO: transfers
|
|
||||||
for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) {
|
for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) {
|
||||||
// if already in queue
|
// if already in queue
|
||||||
if (i_g == group_number && i_p == peer_number && i_h == hash) {
|
if (i_g == group_number && i_p == peer_number && i_h == hash) {
|
||||||
@ -123,6 +120,32 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check for running transfer
|
||||||
|
if (_sending_transfers.count(combineIds(group_number, peer_number))) {
|
||||||
|
for (const auto& [_, transfer] : _sending_transfers.at(combineIds(group_number, peer_number))) {
|
||||||
|
if (std::holds_alternative<SendingTransfer::Info>(transfer.v)) {
|
||||||
|
// ignore info
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto& t_c = std::get<SendingTransfer::Chunk>(transfer.v);
|
||||||
|
|
||||||
|
if (content != t_c.content) {
|
||||||
|
// ignore different content
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto chunk_idx_vec = content.get<Components::FT1ChunkSHA1Cache>().chunkIndices(hash);
|
||||||
|
|
||||||
|
for (size_t idx : chunk_idx_vec) {
|
||||||
|
if (idx == t_c.chunk_index) {
|
||||||
|
// already sending
|
||||||
|
return; // skip
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// not in queue yet
|
// not in queue yet
|
||||||
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f));
|
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f));
|
||||||
}
|
}
|
||||||
@ -131,7 +154,7 @@ uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t pee
|
|||||||
return (uint64_t(group_number) << 32) | peer_number;
|
return (uint64_t(group_number) << 32) | peer_number;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
|
void SHA1_NGCFT1::updateMessages(ObjectHandle ce) {
|
||||||
assert(ce.all_of<Components::Messages>());
|
assert(ce.all_of<Components::Messages>());
|
||||||
|
|
||||||
for (auto msg : ce.get<Components::Messages>().messages) {
|
for (auto msg : ce.get<Components::Messages>().messages) {
|
||||||
@ -160,7 +183,7 @@ void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) {
|
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) {
|
||||||
// get a list of peers we can request this file from
|
// get a list of peers we can request this file from
|
||||||
// TODO: randomly request from non SuspectedParticipants
|
// TODO: randomly request from non SuspectedParticipants
|
||||||
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
|
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
|
||||||
@ -194,6 +217,10 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(C
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_cr.all_of<Contact::Components::TagSelfStrong>(child)) {
|
||||||
|
continue; // skip self
|
||||||
|
}
|
||||||
|
|
||||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
|
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
|
||||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
|
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
|
||||||
un_tox_peers.push_back(tgpe.peer_number);
|
un_tox_peers.push_back(tgpe.peer_number);
|
||||||
@ -219,11 +246,13 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(C
|
|||||||
}
|
}
|
||||||
|
|
||||||
SHA1_NGCFT1::SHA1_NGCFT1(
|
SHA1_NGCFT1::SHA1_NGCFT1(
|
||||||
|
ObjectStore2& os,
|
||||||
Contact3Registry& cr,
|
Contact3Registry& cr,
|
||||||
RegistryMessageModel& rmm,
|
RegistryMessageModel& rmm,
|
||||||
NGCFT1& nft,
|
NGCFT1& nft,
|
||||||
ToxContactModel2& tcm
|
ToxContactModel2& tcm
|
||||||
) :
|
) :
|
||||||
|
_os(os),
|
||||||
_cr(cr),
|
_cr(cr),
|
||||||
_rmm(rmm),
|
_rmm(rmm),
|
||||||
_nft(nft),
|
_nft(nft),
|
||||||
@ -266,8 +295,9 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
|
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
|
||||||
it->second.time_since_activity += delta;
|
it->second.time_since_activity += delta;
|
||||||
|
|
||||||
// if we have not heard for 10sec, timeout
|
// if we have not heard for 2min, timeout (lower level event on real timeout)
|
||||||
if (it->second.time_since_activity >= 10.f) {
|
// TODO: do we really need this if we get events?
|
||||||
|
if (it->second.time_since_activity >= 120.f) {
|
||||||
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
|
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
|
||||||
it = peer_it->second.erase(it);
|
it = peer_it->second.erase(it);
|
||||||
} else {
|
} else {
|
||||||
@ -320,8 +350,8 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{ // requested info timers
|
{ // requested info timers
|
||||||
std::vector<Content> timed_out;
|
std::vector<Object> timed_out;
|
||||||
_contentr.view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Content e, Components::ReRequestInfoTimer& rrit) {
|
_os.registry().view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Object e, Components::ReRequestInfoTimer& rrit) {
|
||||||
rrit.timer += delta;
|
rrit.timer += delta;
|
||||||
|
|
||||||
// 15sec, TODO: config
|
// 15sec, TODO: config
|
||||||
@ -331,12 +361,13 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
});
|
});
|
||||||
for (const auto e : timed_out) {
|
for (const auto e : timed_out) {
|
||||||
// TODO: avoid dups
|
// TODO: avoid dups
|
||||||
_queue_content_want_info.push_back({_contentr, e});
|
_queue_content_want_info.push_back(_os.objectHandle(e));
|
||||||
_contentr.remove<Components::ReRequestInfoTimer>(e);
|
_os.registry().remove<Components::ReRequestInfoTimer>(e);
|
||||||
|
// TODO: throw update?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
{ // requested chunk timers
|
{ // requested chunk timers
|
||||||
_contentr.view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||||
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
||||||
it->second += delta;
|
it->second += delta;
|
||||||
|
|
||||||
@ -522,11 +553,11 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
|||||||
// first, open file for write(+readback)
|
// first, open file for write(+readback)
|
||||||
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path};
|
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path};
|
||||||
// TODO: replace with filesystem or something
|
// TODO: replace with filesystem or something
|
||||||
// TODO: ensure dir exists
|
|
||||||
if (full_file_path.back() != '/') {
|
if (full_file_path.back() != '/') {
|
||||||
full_file_path += "/";
|
full_file_path += "/";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensure dir exists
|
||||||
std::filesystem::create_directories(full_file_path);
|
std::filesystem::create_directories(full_file_path);
|
||||||
|
|
||||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||||
@ -534,14 +565,13 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
|||||||
|
|
||||||
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
|
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
|
||||||
|
|
||||||
std::unique_ptr<FileRWMapped> file_impl;
|
|
||||||
const bool file_exists = std::filesystem::exists(full_file_path);
|
const bool file_exists = std::filesystem::exists(full_file_path);
|
||||||
|
std::unique_ptr<File2I> file_impl = std::make_unique<File2RWMapped>(full_file_path, info.file_size);
|
||||||
file_impl = std::make_unique<FileRWMapped>(full_file_path, info.file_size);
|
|
||||||
|
|
||||||
if (!file_impl->isGood()) {
|
if (!file_impl->isGood()) {
|
||||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
|
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
|
||||||
//e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
|
// we failed opening that filepath, so we should offer the user the oportunity to save it differently
|
||||||
|
e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -557,12 +587,11 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
|||||||
// iterate existing file
|
// iterate existing file
|
||||||
for (size_t i = 0; i < info.chunks.size(); i++) {
|
for (size_t i = 0; i < info.chunks.size(); i++) {
|
||||||
const uint64_t chunk_size = info.chunkSize(i);
|
const uint64_t chunk_size = info.chunkSize(i);
|
||||||
auto existing_data = file_impl->read(i*uint64_t(info.chunk_size), chunk_size);
|
auto existing_data = file_impl->read(chunk_size, i*uint64_t(info.chunk_size));
|
||||||
assert(existing_data.size() == chunk_size);
|
|
||||||
|
|
||||||
// TODO: avoid copy
|
assert(existing_data.size == chunk_size);
|
||||||
|
if (existing_data.size == chunk_size) {
|
||||||
const auto data_hash = SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())};
|
const auto data_hash = SHA1Digest{hash_sha1(existing_data.ptr, existing_data.size)};
|
||||||
const bool data_equal = data_hash == info.chunks.at(i);
|
const bool data_equal = data_hash == info.chunks.at(i);
|
||||||
|
|
||||||
cc.have_chunk.push_back(data_equal);
|
cc.have_chunk.push_back(data_equal);
|
||||||
@ -574,6 +603,9 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
|||||||
} else {
|
} else {
|
||||||
//std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n";
|
//std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n";
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// error reading?
|
||||||
|
}
|
||||||
|
|
||||||
_chunks[info.chunks[i]] = ce;
|
_chunks[info.chunks[i]] = ce;
|
||||||
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
|
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
|
||||||
@ -801,9 +833,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
|||||||
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||||
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
|
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
|
||||||
|
|
||||||
// TODO: avoid temporary copy
|
if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
|
||||||
// TODO: check return
|
|
||||||
if (!file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size})) {
|
|
||||||
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
|
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -841,14 +871,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
|||||||
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
|
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
|
||||||
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
|
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
|
||||||
// TODO: should we really use file?
|
// TODO: should we really use file?
|
||||||
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read((chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset, e.data_size);
|
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(
|
||||||
|
e.data_size,
|
||||||
|
(chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset
|
||||||
|
);
|
||||||
|
|
||||||
// TODO: optimize
|
// TODO: optimize
|
||||||
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
|
for (size_t i = 0; i < e.data_size && i < data.size; i++) {
|
||||||
e.data[i] = data[i];
|
e.data[i] = data[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size();
|
chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size;
|
||||||
// TODO: add event to propergate to messages
|
// TODO: add event to propergate to messages
|
||||||
//_rmm.throwEventUpdate(transfer); // should we?
|
//_rmm.throwEventUpdate(transfer); // should we?
|
||||||
|
|
||||||
@ -929,10 +962,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
const auto chunk_size = info.chunkSize(chunk_index);
|
const auto chunk_size = info.chunkSize(chunk_index);
|
||||||
assert(offset_into_file+chunk_size <= info.file_size);
|
assert(offset_into_file+chunk_size <= info.file_size);
|
||||||
|
|
||||||
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(offset_into_file, chunk_size);
|
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file);
|
||||||
|
assert(!chunk_data.empty());
|
||||||
|
|
||||||
// check hash of chunk
|
// check hash of chunk
|
||||||
auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size());
|
auto got_hash = hash_sha1(chunk_data.ptr, chunk_data.size);
|
||||||
if (info.chunks.at(chunk_index) == got_hash) {
|
if (info.chunks.at(chunk_index) == got_hash) {
|
||||||
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
||||||
|
|
||||||
@ -954,7 +988,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
// HACK: remap file, to clear ram
|
// HACK: remap file, to clear ram
|
||||||
|
|
||||||
// TODO: error checking
|
// TODO: error checking
|
||||||
ce.get<Message::Components::Transfer::File>() = std::make_unique<FileRWMapped>(
|
ce.get<Message::Components::Transfer::File>() = std::make_unique<File2RWMapped>(
|
||||||
ce.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
|
ce.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
|
||||||
info.file_size
|
info.file_size
|
||||||
);
|
);
|
||||||
@ -962,7 +996,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
|
|
||||||
// good chunk
|
// good chunk
|
||||||
// TODO: have wasted + metadata
|
// TODO: have wasted + metadata
|
||||||
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size();
|
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1042,19 +1076,29 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
|||||||
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
|
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
|
||||||
reg.emplace<Message::Components::Timestamp>(new_msg_e, ts); // reactive?
|
reg.emplace<Message::Components::Timestamp>(new_msg_e, ts); // reactive?
|
||||||
|
|
||||||
|
reg.emplace<Message::Components::TagUnread>(new_msg_e);
|
||||||
|
|
||||||
{ // by whom
|
{ // by whom
|
||||||
auto& synced_by = reg.get_or_emplace<Message::Components::SyncedBy>(new_msg_e).list;
|
reg.get_or_emplace<Message::Components::SyncedBy>(new_msg_e).ts.try_emplace(self_c, ts);
|
||||||
synced_by.emplace(self_c);
|
}
|
||||||
|
|
||||||
|
{ // we received it, so we have it
|
||||||
|
auto& rb = reg.get_or_emplace<Message::Components::ReceivedBy>(new_msg_e).ts;
|
||||||
|
rb.try_emplace(c, ts);
|
||||||
|
// TODO: how do we handle partial files???
|
||||||
|
// tox ft rn only sets self if the file was received fully
|
||||||
|
rb.try_emplace(self_c, ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if content exists
|
// check if content exists
|
||||||
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
|
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
|
||||||
ContentHandle ce;
|
ObjectHandle ce;
|
||||||
if (_info_to_content.count(sha1_info_hash)) {
|
if (_info_to_content.count(sha1_info_hash)) {
|
||||||
ce = _info_to_content.at(sha1_info_hash);
|
ce = _info_to_content.at(sha1_info_hash);
|
||||||
std::cout << "SHA1_NGCFT1: new message has existing content\n";
|
std::cout << "SHA1_NGCFT1: new message has existing content\n";
|
||||||
} else {
|
} else {
|
||||||
ce = {_contentr, _contentr.create()};
|
// TODO: backend
|
||||||
|
ce = {_os.registry(), _os.registry().create()};
|
||||||
_info_to_content[sha1_info_hash] = ce;
|
_info_to_content[sha1_info_hash] = ce;
|
||||||
std::cout << "SHA1_NGCFT1: new message has new content\n";
|
std::cout << "SHA1_NGCFT1: new message has new content\n";
|
||||||
|
|
||||||
@ -1142,9 +1186,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
file_name_ = std::string(file_name),
|
file_name_ = std::string(file_name),
|
||||||
file_path_ = std::string(file_path)
|
file_path_ = std::string(file_path)
|
||||||
]() mutable {
|
]() mutable {
|
||||||
// TODO: rw?
|
auto file_impl = std::make_unique<File2RWMapped>(file_path_, -1);
|
||||||
// TODO: memory mapped would be king
|
|
||||||
auto file_impl = std::make_unique<FileRFile>(file_path_);
|
|
||||||
if (!file_impl->isGood()) {
|
if (!file_impl->isGood()) {
|
||||||
{
|
{
|
||||||
std::lock_guard l{self->_info_builder_queue_mutex};
|
std::lock_guard l{self->_info_builder_queue_mutex};
|
||||||
@ -1163,19 +1205,19 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
FT1InfoSHA1 sha1_info;
|
FT1InfoSHA1 sha1_info;
|
||||||
// build info
|
// build info
|
||||||
sha1_info.file_name = file_name_;
|
sha1_info.file_name = file_name_;
|
||||||
sha1_info.file_size = file_impl->_file_size;
|
sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details
|
||||||
|
|
||||||
{ // build chunks
|
{ // build chunks
|
||||||
// HACK: load file fully
|
// HACK: load file fully
|
||||||
// TODO: the speed is truly horrid
|
// ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway
|
||||||
const auto file_data = file_impl->read(0, file_impl->_file_size);
|
const auto file_data = file_impl->read(file_impl->_file_size, 0);
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
for (; i + sha1_info.chunk_size < file_data.size(); i += sha1_info.chunk_size) {
|
for (; i + sha1_info.chunk_size < file_data.size; i += sha1_info.chunk_size) {
|
||||||
sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, sha1_info.chunk_size));
|
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, sha1_info.chunk_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i < file_data.size()) {
|
if (i < file_data.size) {
|
||||||
sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, file_data.size()-i));
|
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, file_data.size-i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1194,7 +1236,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
]() mutable { //
|
]() mutable { //
|
||||||
// back on iterate thread
|
// back on iterate thread
|
||||||
|
|
||||||
auto file_impl = std::make_unique<FileRFile>(file_path_);
|
auto file_impl = std::make_unique<File2RWMapped>(file_path_, sha1_info.file_size);
|
||||||
if (!file_impl->isGood()) {
|
if (!file_impl->isGood()) {
|
||||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
|
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
|
||||||
return;
|
return;
|
||||||
@ -1211,7 +1253,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n";
|
std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n";
|
||||||
|
|
||||||
// check if content exists
|
// check if content exists
|
||||||
ContentHandle ce;
|
ObjectHandle ce;
|
||||||
if (self->_info_to_content.count(sha1_info_hash)) {
|
if (self->_info_to_content.count(sha1_info_hash)) {
|
||||||
ce = self->_info_to_content.at(sha1_info_hash);
|
ce = self->_info_to_content.at(sha1_info_hash);
|
||||||
|
|
||||||
@ -1280,7 +1322,8 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
it = self->_queue_content_want_chunk.erase(it);
|
it = self->_queue_content_want_chunk.erase(it);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ce = {self->_contentr, self->_contentr.create()};
|
// TODO: backend
|
||||||
|
ce = {self->_os.registry(), self->_os.registry().create()};
|
||||||
self->_info_to_content[sha1_info_hash] = ce;
|
self->_info_to_content[sha1_info_hash] = ce;
|
||||||
|
|
||||||
ce.emplace<Components::FT1InfoSHA1>(sha1_info);
|
ce.emplace<Components::FT1InfoSHA1>(sha1_info);
|
||||||
@ -1325,6 +1368,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
||||||
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
||||||
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
|
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
|
||||||
|
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
|
||||||
|
|
||||||
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
|
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
|
||||||
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
|
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
|
||||||
@ -1357,10 +1401,6 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
// TODO: check return
|
// TODO: check return
|
||||||
self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size());
|
self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size());
|
||||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||||
|
|
||||||
// TODO: generalize?
|
|
||||||
auto& synced_by = reg_ptr->emplace<Message::Components::SyncedBy>(msg_e).list;
|
|
||||||
synced_by.emplace(c_self);
|
|
||||||
} else if (
|
} else if (
|
||||||
// non online group
|
// non online group
|
||||||
self->_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
|
self->_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
|
||||||
@ -1368,12 +1408,11 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
// create msg_id
|
// create msg_id
|
||||||
const uint32_t message_id = randombytes_random();
|
const uint32_t message_id = randombytes_random();
|
||||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||||
|
|
||||||
// TODO: generalize?
|
|
||||||
auto& synced_by = reg_ptr->emplace<Message::Components::SyncedBy>(msg_e).list;
|
|
||||||
synced_by.emplace(c_self);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reg_ptr->get_or_emplace<Message::Components::SyncedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||||
|
reg_ptr->get_or_emplace<Message::Components::ReceivedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||||
|
|
||||||
self->_rmm.throwEventConstruct(*reg_ptr, msg_e);
|
self->_rmm.throwEventConstruct(*reg_ptr, msg_e);
|
||||||
|
|
||||||
// TODO: place in iterate?
|
// TODO: place in iterate?
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
// solanaceae port of sha1 fts for NGCFT1
|
// solanaceae port of sha1 fts for NGCFT1
|
||||||
|
|
||||||
|
#include <solanaceae/object_store/object_store.hpp>
|
||||||
#include <solanaceae/contact/contact_model3.hpp>
|
#include <solanaceae/contact/contact_model3.hpp>
|
||||||
#include <solanaceae/message3/registry_message_model.hpp>
|
#include <solanaceae/message3/registry_message_model.hpp>
|
||||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||||
@ -20,11 +21,9 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <list>
|
#include <list>
|
||||||
|
|
||||||
enum class Content : uint32_t {};
|
|
||||||
using ContentRegistry = entt::basic_registry<Content>;
|
|
||||||
using ContentHandle = entt::basic_handle<ContentRegistry>;
|
|
||||||
|
|
||||||
class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||||
|
ObjectStore2& _os;
|
||||||
|
// TODO: backend abstraction
|
||||||
Contact3Registry& _cr;
|
Contact3Registry& _cr;
|
||||||
RegistryMessageModel& _rmm;
|
RegistryMessageModel& _rmm;
|
||||||
NGCFT1& _nft;
|
NGCFT1& _nft;
|
||||||
@ -32,21 +31,18 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
|
|
||||||
std::minstd_rand _rng {1337*11};
|
std::minstd_rand _rng {1337*11};
|
||||||
|
|
||||||
// registry per group?
|
|
||||||
ContentRegistry _contentr;
|
|
||||||
|
|
||||||
// limit this to each group?
|
// limit this to each group?
|
||||||
entt::dense_map<SHA1Digest, ContentHandle> _info_to_content;
|
entt::dense_map<SHA1Digest, ObjectHandle> _info_to_content;
|
||||||
|
|
||||||
// sha1 chunk index
|
// sha1 chunk index
|
||||||
// TODO: optimize lookup
|
// TODO: optimize lookup
|
||||||
// TODO: multiple contents. hashes might be unique, but data is not
|
// TODO: multiple contents. hashes might be unique, but data is not
|
||||||
entt::dense_map<SHA1Digest, ContentHandle> _chunks;
|
entt::dense_map<SHA1Digest, ObjectHandle> _chunks;
|
||||||
|
|
||||||
// group_number, peer_number, content, chunk_hash, timer
|
// group_number, peer_number, content, chunk_hash, timer
|
||||||
std::deque<std::tuple<uint32_t, uint32_t, ContentHandle, SHA1Digest, float>> _queue_requested_chunk;
|
std::deque<std::tuple<uint32_t, uint32_t, ObjectHandle, SHA1Digest, float>> _queue_requested_chunk;
|
||||||
//void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
|
//void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
|
||||||
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash);
|
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash);
|
||||||
|
|
||||||
struct SendingTransfer {
|
struct SendingTransfer {
|
||||||
struct Info {
|
struct Info {
|
||||||
@ -56,7 +52,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
};
|
};
|
||||||
|
|
||||||
struct Chunk {
|
struct Chunk {
|
||||||
ContentHandle content;
|
ObjectHandle content;
|
||||||
size_t chunk_index; // <.< remove offset_into_file
|
size_t chunk_index; // <.< remove offset_into_file
|
||||||
//uint64_t offset_into_file;
|
//uint64_t offset_into_file;
|
||||||
// or data?
|
// or data?
|
||||||
@ -72,14 +68,14 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
|
|
||||||
struct ReceivingTransfer {
|
struct ReceivingTransfer {
|
||||||
struct Info {
|
struct Info {
|
||||||
ContentHandle content;
|
ObjectHandle content;
|
||||||
// copy of info data
|
// copy of info data
|
||||||
// too large?
|
// too large?
|
||||||
std::vector<uint8_t> info_data;
|
std::vector<uint8_t> info_data;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Chunk {
|
struct Chunk {
|
||||||
ContentHandle content;
|
ObjectHandle content;
|
||||||
std::vector<size_t> chunk_indices;
|
std::vector<size_t> chunk_indices;
|
||||||
// or data?
|
// or data?
|
||||||
// if memmapped, this would be just a pointer
|
// if memmapped, this would be just a pointer
|
||||||
@ -93,8 +89,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers;
|
entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers;
|
||||||
|
|
||||||
// makes request rotate around open content
|
// makes request rotate around open content
|
||||||
std::deque<ContentHandle> _queue_content_want_info;
|
std::deque<ObjectHandle> _queue_content_want_info;
|
||||||
std::deque<ContentHandle> _queue_content_want_chunk;
|
std::deque<ObjectHandle> _queue_content_want_chunk;
|
||||||
|
|
||||||
std::atomic_bool _info_builder_dirty {false};
|
std::atomic_bool _info_builder_dirty {false};
|
||||||
std::mutex _info_builder_queue_mutex;
|
std::mutex _info_builder_queue_mutex;
|
||||||
@ -108,20 +104,21 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
|
|
||||||
static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number);
|
static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number);
|
||||||
|
|
||||||
void updateMessages(ContentHandle ce);
|
void updateMessages(ObjectHandle ce);
|
||||||
|
|
||||||
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ContentHandle ce);
|
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
|
||||||
|
|
||||||
public: // TODO: config
|
public: // TODO: config
|
||||||
bool _udp_only {false};
|
bool _udp_only {false};
|
||||||
|
|
||||||
size_t _max_concurrent_in {4};
|
size_t _max_concurrent_in {6};
|
||||||
size_t _max_concurrent_out {6};
|
size_t _max_concurrent_out {8};
|
||||||
// TODO: probably also includes running transfers rn (meh)
|
// TODO: probably also includes running transfers rn (meh)
|
||||||
size_t _max_pending_requests {32}; // per content
|
size_t _max_pending_requests {32}; // per content
|
||||||
|
|
||||||
public:
|
public:
|
||||||
SHA1_NGCFT1(
|
SHA1_NGCFT1(
|
||||||
|
ObjectStore2& os,
|
||||||
Contact3Registry& cr,
|
Contact3Registry& cr,
|
||||||
RegistryMessageModel& rmm,
|
RegistryMessageModel& rmm,
|
||||||
NGCFT1& nft,
|
NGCFT1& nft,
|
||||||
|
Reference in New Issue
Block a user