Compare commits
13 Commits
bb3f907cd8
...
cca_rework
Author | SHA1 | Date | |
---|---|---|---|
4ee5dd6ca5 | |||
0d49752c3e | |||
d957f9496a | |||
f460d7b4a5 | |||
475a99054f | |||
1c5f4f24a7 | |||
95b55c3a4a | |||
2f9340b937 | |||
f4796397ff | |||
bc09b5aa60 | |||
968ae94848 | |||
d2c2594a77 | |||
b9267c1c6f |
@ -22,8 +22,13 @@ add_library(solanaceae_ngcft1
|
|||||||
./solanaceae/ngc_ft1/ngcft1.hpp
|
./solanaceae/ngc_ft1/ngcft1.hpp
|
||||||
./solanaceae/ngc_ft1/ngcft1.cpp
|
./solanaceae/ngc_ft1/ngcft1.cpp
|
||||||
|
|
||||||
|
./solanaceae/ngc_ft1/cca.hpp
|
||||||
|
./solanaceae/ngc_ft1/flow_only.hpp
|
||||||
|
./solanaceae/ngc_ft1/flow_only.cpp
|
||||||
./solanaceae/ngc_ft1/ledbat.hpp
|
./solanaceae/ngc_ft1/ledbat.hpp
|
||||||
./solanaceae/ngc_ft1/ledbat.cpp
|
./solanaceae/ngc_ft1/ledbat.cpp
|
||||||
|
./solanaceae/ngc_ft1/cubic.hpp
|
||||||
|
./solanaceae/ngc_ft1/cubic.cpp
|
||||||
|
|
||||||
./solanaceae/ngc_ft1/rcv_buf.hpp
|
./solanaceae/ngc_ft1/rcv_buf.hpp
|
||||||
./solanaceae/ngc_ft1/rcv_buf.cpp
|
./solanaceae/ngc_ft1/rcv_buf.cpp
|
||||||
@ -39,6 +44,10 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC
|
|||||||
########################################
|
########################################
|
||||||
|
|
||||||
add_library(solanaceae_sha1_ngcft1
|
add_library(solanaceae_sha1_ngcft1
|
||||||
|
# hacky deps
|
||||||
|
./solanaceae/ngc_ft1_sha1/mio.hpp
|
||||||
|
./solanaceae/ngc_ft1_sha1/file_rw_mapped.hpp
|
||||||
|
|
||||||
./solanaceae/ngc_ft1_sha1/hash_utils.hpp
|
./solanaceae/ngc_ft1_sha1/hash_utils.hpp
|
||||||
./solanaceae/ngc_ft1_sha1/hash_utils.cpp
|
./solanaceae/ngc_ft1_sha1/hash_utils.cpp
|
||||||
|
|
||||||
@ -50,6 +59,7 @@ add_library(solanaceae_sha1_ngcft1
|
|||||||
)
|
)
|
||||||
target_include_directories(solanaceae_sha1_ngcft1 PUBLIC .)
|
target_include_directories(solanaceae_sha1_ngcft1 PUBLIC .)
|
||||||
target_compile_features(solanaceae_sha1_ngcft1 PUBLIC cxx_std_17)
|
target_compile_features(solanaceae_sha1_ngcft1 PUBLIC cxx_std_17)
|
||||||
|
target_compile_definitions(solanaceae_sha1_ngcft1 PRIVATE WIN32_LEAN_AND_MEAN NOMINMAX)
|
||||||
target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
|
target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
|
||||||
solanaceae_ngcft1
|
solanaceae_ngcft1
|
||||||
sha1::sha1
|
sha1::sha1
|
||||||
|
67
solanaceae/ngc_ft1/cca.hpp
Normal file
67
solanaceae/ngc_ft1/cca.hpp
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <cstddef>
|
||||||
|
|
||||||
|
// TODO: refactor, more state tracking in ccai and seperate into flow and congestion algos
|
||||||
|
inline bool isSkipSeqID(const std::pair<uint8_t, uint16_t>& a, const std::pair<uint8_t, uint16_t>& b) {
|
||||||
|
// this is not perfect, would need more ft id based history
|
||||||
|
if (a.first != b.first) {
|
||||||
|
return false; // we dont know
|
||||||
|
} else {
|
||||||
|
return a.second+1 != b.second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CCAI {
|
||||||
|
public: // config
|
||||||
|
using SeqIDType = std::pair<uint8_t, uint16_t>; // tf_id, seq_id
|
||||||
|
|
||||||
|
static constexpr size_t IPV4_HEADER_SIZE {20};
|
||||||
|
static constexpr size_t IPV6_HEADER_SIZE {40}; // bru
|
||||||
|
static constexpr size_t UDP_HEADER_SIZE {8};
|
||||||
|
|
||||||
|
// TODO: tcp AND IPv6 will be different
|
||||||
|
static constexpr size_t SEGMENT_OVERHEAD {
|
||||||
|
4+ // ft overhead
|
||||||
|
46+ // tox?
|
||||||
|
UDP_HEADER_SIZE+
|
||||||
|
IPV4_HEADER_SIZE
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: make configurable, set with tox ngc lossy packet size
|
||||||
|
//const size_t MAXIMUM_SEGMENT_DATA_SIZE {1000-4};
|
||||||
|
const size_t MAXIMUM_SEGMENT_DATA_SIZE {500-4};
|
||||||
|
|
||||||
|
const size_t MAXIMUM_SEGMENT_SIZE {MAXIMUM_SEGMENT_DATA_SIZE + SEGMENT_OVERHEAD}; // tox 500 - 4 from ft
|
||||||
|
//static_assert(maximum_segment_size == 574); // mesured in wireshark
|
||||||
|
|
||||||
|
// flow control
|
||||||
|
float max_byterate_allowed {10*1024*1024}; // 10MiB/s
|
||||||
|
|
||||||
|
public: // api
|
||||||
|
CCAI(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {}
|
||||||
|
|
||||||
|
// return the current believed window in bytes of how much data can be inflight,
|
||||||
|
//virtual float getCWnD(void) const = 0;
|
||||||
|
|
||||||
|
// TODO: api for how much data we should send
|
||||||
|
// take time since last sent into account
|
||||||
|
// respect max_byterate_allowed
|
||||||
|
virtual size_t canSend(void) = 0;
|
||||||
|
|
||||||
|
// get the list of timed out seq_ids
|
||||||
|
virtual std::vector<SeqIDType> getTimeouts(void) const = 0;
|
||||||
|
|
||||||
|
public: // callbacks
|
||||||
|
// data size is without overhead
|
||||||
|
virtual void onSent(SeqIDType seq, size_t data_size) = 0;
|
||||||
|
|
||||||
|
// TODO: copy???
|
||||||
|
virtual void onAck(std::vector<SeqIDType> seqs) = 0;
|
||||||
|
|
||||||
|
// if discard, not resent, not inflight
|
||||||
|
virtual void onLoss(SeqIDType seq, bool discard) = 0;
|
||||||
|
};
|
||||||
|
|
62
solanaceae/ngc_ft1/cubic.cpp
Normal file
62
solanaceae/ngc_ft1/cubic.cpp
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
#include "./cubic.hpp"
|
||||||
|
|
||||||
|
#include <cmath>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
float CUBIC::getCWnD(void) const {
|
||||||
|
const double K = cbrt(
|
||||||
|
(_window_max * (1. - BETA)) / SCALING_CONSTANT
|
||||||
|
);
|
||||||
|
|
||||||
|
const double time_since_reduction = getTimeNow() - _time_point_reduction;
|
||||||
|
|
||||||
|
const double TK = time_since_reduction - K;
|
||||||
|
|
||||||
|
const double cwnd =
|
||||||
|
SCALING_CONSTANT
|
||||||
|
* TK * TK * TK // TK^3
|
||||||
|
+ _window_max
|
||||||
|
;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
std::cout
|
||||||
|
<< "K:" << K
|
||||||
|
<< " ts:" << time_since_reduction
|
||||||
|
<< " TK:" << TK
|
||||||
|
<< " cwnd:" << cwnd
|
||||||
|
<< " rtt:" << getCurrentDelay()
|
||||||
|
<< "\n"
|
||||||
|
;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return std::max<float>(cwnd, 2.f * MAXIMUM_SEGMENT_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
void CUBIC::onCongestion(void) {
|
||||||
|
if (getTimeNow() - _time_point_reduction >= getCurrentDelay()) {
|
||||||
|
const auto current_cwnd = getCWnD();
|
||||||
|
_time_point_reduction = getTimeNow();
|
||||||
|
_window_max = current_cwnd;
|
||||||
|
|
||||||
|
std::cout << "CONGESTION! cwnd:" << current_cwnd << "\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t CUBIC::canSend(void) {
|
||||||
|
const auto fspace_pkgs = FlowOnly::canSend();
|
||||||
|
|
||||||
|
if (fspace_pkgs == 0u) {
|
||||||
|
return 0u;
|
||||||
|
}
|
||||||
|
|
||||||
|
const int64_t cspace_bytes = getCWnD() - _in_flight_bytes;
|
||||||
|
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||||
|
return 0u;
|
||||||
|
}
|
||||||
|
|
||||||
|
// limit to whole packets
|
||||||
|
size_t cspace_pkgs = std::floor(cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
|
|
||||||
|
return std::min(cspace_pkgs, fspace_pkgs);
|
||||||
|
}
|
||||||
|
|
52
solanaceae/ngc_ft1/cubic.hpp
Normal file
52
solanaceae/ngc_ft1/cubic.hpp
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "./flow_only.hpp"
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
|
||||||
|
struct CUBIC : public FlowOnly {
|
||||||
|
//using clock = std::chrono::steady_clock;
|
||||||
|
|
||||||
|
public: // config
|
||||||
|
static constexpr float BETA {0.7f};
|
||||||
|
static constexpr float SCALING_CONSTANT {0.4f};
|
||||||
|
static constexpr float RTT_EMA_ALPHA = 0.1f; // 0.1 is very smooth, might need more
|
||||||
|
|
||||||
|
private:
|
||||||
|
// window size before last reduciton
|
||||||
|
double _window_max {2.f * MAXIMUM_SEGMENT_SIZE}; // start with mss*2
|
||||||
|
//double _window_last_max {2.f * MAXIMUM_SEGMENT_SIZE};
|
||||||
|
double _time_point_reduction {getTimeNow()};
|
||||||
|
|
||||||
|
private:
|
||||||
|
float getCWnD(void) const;
|
||||||
|
|
||||||
|
// moving avg over the last few delay samples
|
||||||
|
// VERY sensitive to bundling acks
|
||||||
|
//float getCurrentDelay(void) const;
|
||||||
|
|
||||||
|
//void addRTT(float new_delay);
|
||||||
|
|
||||||
|
void onCongestion(void) override;
|
||||||
|
|
||||||
|
public: // api
|
||||||
|
CUBIC(size_t maximum_segment_data_size) : FlowOnly(maximum_segment_data_size) {}
|
||||||
|
|
||||||
|
// TODO: api for how much data we should send
|
||||||
|
// take time since last sent into account
|
||||||
|
// respect max_byterate_allowed
|
||||||
|
size_t canSend(void) override;
|
||||||
|
|
||||||
|
// get the list of timed out seq_ids
|
||||||
|
//std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
|
|
||||||
|
public: // callbacks
|
||||||
|
// data size is without overhead
|
||||||
|
//void onSent(SeqIDType seq, size_t data_size) override;
|
||||||
|
|
||||||
|
//void onAck(std::vector<SeqIDType> seqs) override;
|
||||||
|
|
||||||
|
// if discard, not resent, not inflight
|
||||||
|
//void onLoss(SeqIDType seq, bool discard) override;
|
||||||
|
};
|
||||||
|
|
160
solanaceae/ngc_ft1/flow_only.cpp
Normal file
160
solanaceae/ngc_ft1/flow_only.cpp
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
#include "./flow_only.hpp"
|
||||||
|
|
||||||
|
#include <cmath>
|
||||||
|
#include <cassert>
|
||||||
|
#include <iostream>
|
||||||
|
#include <algorithm>
|
||||||
|
|
||||||
|
float FlowOnly::getCurrentDelay(void) const {
|
||||||
|
return std::min(_rtt_ema, RTT_MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
void FlowOnly::addRTT(float new_delay) {
|
||||||
|
// lerp(new_delay, rtt_ema, 0.1)
|
||||||
|
_rtt_ema = RTT_EMA_ALPHA * new_delay + (1.f - RTT_EMA_ALPHA) * _rtt_ema;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FlowOnly::updateWindow(void) {
|
||||||
|
const float current_delay {getCurrentDelay()};
|
||||||
|
|
||||||
|
_fwnd = max_byterate_allowed * current_delay;
|
||||||
|
//_fwnd *= 1.3f; // try do balance conservative algo a bit, current_delay
|
||||||
|
|
||||||
|
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t FlowOnly::canSend(void) {
|
||||||
|
if (_in_flight.empty()) {
|
||||||
|
assert(_in_flight_bytes == 0);
|
||||||
|
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
updateWindow();
|
||||||
|
|
||||||
|
const int64_t fspace = _fwnd - _in_flight_bytes;
|
||||||
|
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||||
|
return 0u;
|
||||||
|
}
|
||||||
|
|
||||||
|
// limit to whole packets
|
||||||
|
size_t space = std::floor(fspace / MAXIMUM_SEGMENT_DATA_SIZE)
|
||||||
|
* MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
|
|
||||||
|
return space;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||||
|
std::vector<SeqIDType> list;
|
||||||
|
|
||||||
|
// after 3 rtt delay, we trigger timeout
|
||||||
|
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
|
||||||
|
|
||||||
|
for (const auto& [seq, time_stamp, size] : _in_flight) {
|
||||||
|
if (now_adjusted > time_stamp) {
|
||||||
|
list.push_back(seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
|
||||||
|
if constexpr (true) {
|
||||||
|
for (const auto& it : _in_flight) {
|
||||||
|
assert(std::get<0>(it) != seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_in_flight.push_back({seq, getTimeNow(), data_size + SEGMENT_OVERHEAD});
|
||||||
|
_in_flight_bytes += data_size + SEGMENT_OVERHEAD;
|
||||||
|
//_recently_sent_bytes += data_size + SEGMENT_OVERHEAD;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FlowOnly::onAck(std::vector<SeqIDType> seqs) {
|
||||||
|
if (seqs.empty()) {
|
||||||
|
assert(false && "got empty list of acks???");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto now {getTimeNow()};
|
||||||
|
|
||||||
|
// first seq in seqs is the actual value, all extra are for redundency
|
||||||
|
{ // skip in ack is congestion event
|
||||||
|
// 1. look at primary ack of packet
|
||||||
|
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool {
|
||||||
|
return std::get<0>(v) == seq;
|
||||||
|
});
|
||||||
|
if (it != _in_flight.end()) {
|
||||||
|
if (it != _in_flight.begin()) {
|
||||||
|
// not next expected seq -> skip detected
|
||||||
|
|
||||||
|
std::cout << "CONGESTION out of order\n";
|
||||||
|
onCongestion();
|
||||||
|
//if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
||||||
|
//_recently_lost_data = true;
|
||||||
|
//_last_congestion_event = getTimeNow();
|
||||||
|
//_last_congestion_rtt = getCurrentDelay();
|
||||||
|
//}
|
||||||
|
} else {
|
||||||
|
// only mesure delay, if not a congestion
|
||||||
|
addRTT(now - std::get<1>(*it));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||||
|
#if 0
|
||||||
|
// assume we got a duplicated packet
|
||||||
|
std::cout << "CONGESTION duplicate\n";
|
||||||
|
onCongestion();
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& seq : seqs) {
|
||||||
|
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||||
|
return std::get<0>(v) == seq;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (it == _in_flight.end()) {
|
||||||
|
continue; // not found, ignore
|
||||||
|
} else {
|
||||||
|
//most_recent = std::max(most_recent, std::get<1>(*it));
|
||||||
|
_in_flight_bytes -= std::get<2>(*it);
|
||||||
|
assert(_in_flight_bytes >= 0);
|
||||||
|
//_recently_acked_data += std::get<2>(*it);
|
||||||
|
_in_flight.erase(it);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FlowOnly::onLoss(SeqIDType seq, bool discard) {
|
||||||
|
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||||
|
assert(!std::isnan(std::get<1>(v)));
|
||||||
|
return std::get<0>(v) == seq;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (it == _in_flight.end()) {
|
||||||
|
// error
|
||||||
|
return; // not found, ignore ??
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cerr << "FLOW loss\n";
|
||||||
|
|
||||||
|
// "if data lost is not to be retransmitted"
|
||||||
|
if (discard) {
|
||||||
|
_in_flight_bytes -= std::get<2>(*it);
|
||||||
|
assert(_in_flight_bytes >= 0);
|
||||||
|
_in_flight.erase(it);
|
||||||
|
}
|
||||||
|
// TODO: reset timestamp?
|
||||||
|
|
||||||
|
#if 0 // temporarily disable ce for timeout
|
||||||
|
// at most once per rtt?
|
||||||
|
// TODO: use delay at event instead
|
||||||
|
if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
||||||
|
_recently_lost_data = true;
|
||||||
|
_last_congestion_event = getTimeNow();
|
||||||
|
_last_congestion_rtt = getCurrentDelay();
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
75
solanaceae/ngc_ft1/flow_only.hpp
Normal file
75
solanaceae/ngc_ft1/flow_only.hpp
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "./cca.hpp"
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <vector>
|
||||||
|
#include <tuple>
|
||||||
|
|
||||||
|
struct FlowOnly : public CCAI {
|
||||||
|
protected:
|
||||||
|
using clock = std::chrono::steady_clock;
|
||||||
|
|
||||||
|
public: // config
|
||||||
|
static constexpr float RTT_EMA_ALPHA = 0.1f; // might need over time
|
||||||
|
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
|
||||||
|
|
||||||
|
//float max_byterate_allowed {100.f*1024*1024}; // 100MiB/s
|
||||||
|
float max_byterate_allowed {10.f*1024*1024}; // 10MiB/s
|
||||||
|
//float max_byterate_allowed {1.f*1024*1024}; // 1MiB/s
|
||||||
|
//float max_byterate_allowed {0.6f*1024*1024}; // 600KiB/s
|
||||||
|
//float max_byterate_allowed {0.5f*1024*1024}; // 500KiB/s
|
||||||
|
//float max_byterate_allowed {0.05f*1024*1024}; // 50KiB/s
|
||||||
|
//float max_byterate_allowed {0.15f*1024*1024}; // 150KiB/s
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// initialize to low value, will get corrected very fast
|
||||||
|
float _fwnd {0.01f * max_byterate_allowed}; // in bytes
|
||||||
|
|
||||||
|
// rtt exponental moving average
|
||||||
|
float _rtt_ema {0.1f};
|
||||||
|
|
||||||
|
// list of sequence ids and timestamps of when they where sent (and payload size)
|
||||||
|
std::vector<std::tuple<SeqIDType, float, size_t>> _in_flight;
|
||||||
|
int64_t _in_flight_bytes {0};
|
||||||
|
|
||||||
|
clock::time_point _time_start_offset;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// make values relative to algo start for readability (and precision)
|
||||||
|
// get timestamp in seconds
|
||||||
|
double getTimeNow(void) const {
|
||||||
|
return std::chrono::duration<double>{clock::now() - _time_start_offset}.count();
|
||||||
|
}
|
||||||
|
|
||||||
|
// moving avg over the last few delay samples
|
||||||
|
// VERY sensitive to bundling acks
|
||||||
|
float getCurrentDelay(void) const;
|
||||||
|
|
||||||
|
void addRTT(float new_delay);
|
||||||
|
|
||||||
|
void updateWindow(void);
|
||||||
|
|
||||||
|
virtual void onCongestion(void) {};
|
||||||
|
|
||||||
|
public: // api
|
||||||
|
FlowOnly(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {}
|
||||||
|
|
||||||
|
// TODO: api for how much data we should send
|
||||||
|
// take time since last sent into account
|
||||||
|
// respect max_byterate_allowed
|
||||||
|
size_t canSend(void) override;
|
||||||
|
|
||||||
|
// get the list of timed out seq_ids
|
||||||
|
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
|
|
||||||
|
public: // callbacks
|
||||||
|
// data size is without overhead
|
||||||
|
void onSent(SeqIDType seq, size_t data_size) override;
|
||||||
|
|
||||||
|
void onAck(std::vector<SeqIDType> seqs) override;
|
||||||
|
|
||||||
|
// if discard, not resent, not inflight
|
||||||
|
void onLoss(SeqIDType seq, bool discard) override;
|
||||||
|
};
|
||||||
|
|
@ -15,11 +15,11 @@
|
|||||||
|
|
||||||
inline constexpr bool PLOTTING = false;
|
inline constexpr bool PLOTTING = false;
|
||||||
|
|
||||||
LEDBAT::LEDBAT(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) {
|
LEDBAT::LEDBAT(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_size) {
|
||||||
_time_start_offset = clock::now();
|
_time_start_offset = clock::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t LEDBAT::canSend(void) const {
|
size_t LEDBAT::canSend(void) {
|
||||||
if (_in_flight.empty()) {
|
if (_in_flight.empty()) {
|
||||||
return MAXIMUM_SEGMENT_DATA_SIZE;
|
return MAXIMUM_SEGMENT_DATA_SIZE;
|
||||||
}
|
}
|
||||||
@ -68,6 +68,11 @@ void LEDBAT::onSent(SeqIDType seq, size_t data_size) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void LEDBAT::onAck(std::vector<SeqIDType> seqs) {
|
void LEDBAT::onAck(std::vector<SeqIDType> seqs) {
|
||||||
|
if (seqs.empty()) {
|
||||||
|
assert(false && "got empty list of acks???");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// only take the smallest value
|
// only take the smallest value
|
||||||
float most_recent {-std::numeric_limits<float>::infinity()};
|
float most_recent {-std::numeric_limits<float>::infinity()};
|
||||||
|
|
||||||
@ -75,6 +80,23 @@ void LEDBAT::onAck(std::vector<SeqIDType> seqs) {
|
|||||||
|
|
||||||
const auto now {getTimeNow()};
|
const auto now {getTimeNow()};
|
||||||
|
|
||||||
|
{ // skip in ack is congestion event
|
||||||
|
// 1. look at primary ack of packet
|
||||||
|
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq = seqs.front()](const auto& v) -> bool {
|
||||||
|
return std::get<0>(v) == seq;
|
||||||
|
});
|
||||||
|
if (it != _in_flight.end()) {
|
||||||
|
if (isSkipSeqID(_last_ack_got, std::get<0>(*it))) {
|
||||||
|
if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
||||||
|
_recently_lost_data = true;
|
||||||
|
_last_congestion_event = getTimeNow();
|
||||||
|
_last_congestion_rtt = getCurrentDelay();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: only if newer, triggers double event otherwise (without a timer)
|
||||||
|
_last_ack_got = std::get<0>(*it);
|
||||||
|
}
|
||||||
|
}
|
||||||
for (const auto& seq : seqs) {
|
for (const auto& seq : seqs) {
|
||||||
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool {
|
||||||
return std::get<0>(v) == seq;
|
return std::get<0>(v) == seq;
|
||||||
@ -112,10 +134,6 @@ void LEDBAT::onLoss(SeqIDType seq, bool discard) {
|
|||||||
return; // not found, ignore ??
|
return; // not found, ignore ??
|
||||||
}
|
}
|
||||||
|
|
||||||
_recently_lost_data = true;
|
|
||||||
|
|
||||||
// at most once per rtt?
|
|
||||||
|
|
||||||
if (PLOTTING) {
|
if (PLOTTING) {
|
||||||
std::cerr << "CCA: onLoss: TIME: " << getTimeNow() << "\n";
|
std::cerr << "CCA: onLoss: TIME: " << getTimeNow() << "\n";
|
||||||
}
|
}
|
||||||
@ -128,6 +146,16 @@ void LEDBAT::onLoss(SeqIDType seq, bool discard) {
|
|||||||
}
|
}
|
||||||
// TODO: reset timestamp?
|
// TODO: reset timestamp?
|
||||||
|
|
||||||
|
#if 0 // temporarily disable ce for timeout
|
||||||
|
// at most once per rtt?
|
||||||
|
// TODO: use delay at event instead
|
||||||
|
if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) {
|
||||||
|
_recently_lost_data = true;
|
||||||
|
_last_congestion_event = getTimeNow();
|
||||||
|
_last_congestion_rtt = getCurrentDelay();
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
updateWindows();
|
updateWindows();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,7 +229,7 @@ void LEDBAT::updateWindows(void) {
|
|||||||
|
|
||||||
if (_recently_lost_data) {
|
if (_recently_lost_data) {
|
||||||
_cwnd = std::clamp(
|
_cwnd = std::clamp(
|
||||||
_cwnd / 2.f,
|
_cwnd * 0.7f,
|
||||||
//_cwnd / 1.6f,
|
//_cwnd / 1.6f,
|
||||||
2.f * MAXIMUM_SEGMENT_SIZE,
|
2.f * MAXIMUM_SEGMENT_SIZE,
|
||||||
_cwnd
|
_cwnd
|
||||||
@ -209,7 +237,7 @@ void LEDBAT::updateWindows(void) {
|
|||||||
} else {
|
} else {
|
||||||
// LEDBAT++ (the Rethinking the LEDBAT Protocol paper)
|
// LEDBAT++ (the Rethinking the LEDBAT Protocol paper)
|
||||||
// "Multiplicative decrease"
|
// "Multiplicative decrease"
|
||||||
const float constant {2.f}; // spec recs 1
|
const float constant {1.f}; // spec recs 1
|
||||||
if (queuing_delay < target_delay) {
|
if (queuing_delay < target_delay) {
|
||||||
_cwnd = std::min(
|
_cwnd = std::min(
|
||||||
_cwnd + gain,
|
_cwnd + gain,
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "./cca.hpp"
|
||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
@ -9,8 +11,9 @@
|
|||||||
// LEDBAT++: https://www.ietf.org/archive/id/draft-irtf-iccrg-ledbat-plus-plus-01.txt
|
// LEDBAT++: https://www.ietf.org/archive/id/draft-irtf-iccrg-ledbat-plus-plus-01.txt
|
||||||
|
|
||||||
// LEDBAT++ implementation
|
// LEDBAT++ implementation
|
||||||
struct LEDBAT {
|
struct LEDBAT : public CCAI{
|
||||||
public: // config
|
public: // config
|
||||||
|
#if 0
|
||||||
using SeqIDType = std::pair<uint8_t, uint16_t>; // tf_id, seq_id
|
using SeqIDType = std::pair<uint8_t, uint16_t>; // tf_id, seq_id
|
||||||
|
|
||||||
static constexpr size_t IPV4_HEADER_SIZE {20};
|
static constexpr size_t IPV4_HEADER_SIZE {20};
|
||||||
@ -32,6 +35,7 @@ struct LEDBAT {
|
|||||||
//static constexpr size_t maximum_segment_size {496 + segment_overhead}; // tox 500 - 4 from ft
|
//static constexpr size_t maximum_segment_size {496 + segment_overhead}; // tox 500 - 4 from ft
|
||||||
const size_t MAXIMUM_SEGMENT_SIZE {MAXIMUM_SEGMENT_DATA_SIZE + SEGMENT_OVERHEAD}; // tox 500 - 4 from ft
|
const size_t MAXIMUM_SEGMENT_SIZE {MAXIMUM_SEGMENT_DATA_SIZE + SEGMENT_OVERHEAD}; // tox 500 - 4 from ft
|
||||||
//static_assert(maximum_segment_size == 574); // mesured in wireshark
|
//static_assert(maximum_segment_size == 574); // mesured in wireshark
|
||||||
|
#endif
|
||||||
|
|
||||||
// ledbat++ says 60ms, we might need other values if relayed
|
// ledbat++ says 60ms, we might need other values if relayed
|
||||||
//const float target_delay {0.060f};
|
//const float target_delay {0.060f};
|
||||||
@ -57,19 +61,19 @@ struct LEDBAT {
|
|||||||
// TODO: api for how much data we should send
|
// TODO: api for how much data we should send
|
||||||
// take time since last sent into account
|
// take time since last sent into account
|
||||||
// respect max_byterate_allowed
|
// respect max_byterate_allowed
|
||||||
size_t canSend(void) const;
|
size_t canSend(void) override;
|
||||||
|
|
||||||
// get the list of timed out seq_ids
|
// get the list of timed out seq_ids
|
||||||
std::vector<SeqIDType> getTimeouts(void) const;
|
std::vector<SeqIDType> getTimeouts(void) const override;
|
||||||
|
|
||||||
public: // callbacks
|
public: // callbacks
|
||||||
// data size is without overhead
|
// data size is without overhead
|
||||||
void onSent(SeqIDType seq, size_t data_size);
|
void onSent(SeqIDType seq, size_t data_size) override;
|
||||||
|
|
||||||
void onAck(std::vector<SeqIDType> seqs);
|
void onAck(std::vector<SeqIDType> seqs) override;
|
||||||
|
|
||||||
// if discard, not resent, not inflight
|
// if discard, not resent, not inflight
|
||||||
void onLoss(SeqIDType seq, bool discard);
|
void onLoss(SeqIDType seq, bool discard) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using clock = std::chrono::steady_clock;
|
using clock = std::chrono::steady_clock;
|
||||||
@ -96,9 +100,12 @@ struct LEDBAT {
|
|||||||
|
|
||||||
float _last_cwnd {0.f}; // timepoint of last cwnd correction
|
float _last_cwnd {0.f}; // timepoint of last cwnd correction
|
||||||
int64_t _recently_acked_data {0}; // reset on _last_cwnd
|
int64_t _recently_acked_data {0}; // reset on _last_cwnd
|
||||||
bool _recently_lost_data {false};
|
|
||||||
int64_t _recently_sent_bytes {0};
|
int64_t _recently_sent_bytes {0};
|
||||||
|
|
||||||
|
bool _recently_lost_data {false};
|
||||||
|
float _last_congestion_event {0.f};
|
||||||
|
float _last_congestion_rtt {0.5f};
|
||||||
|
|
||||||
// initialize to low value, will get corrected very fast
|
// initialize to low value, will get corrected very fast
|
||||||
float _fwnd {0.01f * max_byterate_allowed}; // in bytes
|
float _fwnd {0.01f * max_byterate_allowed}; // in bytes
|
||||||
|
|
||||||
@ -116,6 +123,8 @@ struct LEDBAT {
|
|||||||
|
|
||||||
int64_t _in_flight_bytes {0};
|
int64_t _in_flight_bytes {0};
|
||||||
|
|
||||||
|
SeqIDType _last_ack_got {0xff, 0xffff}; // some default
|
||||||
|
|
||||||
private: // helper
|
private: // helper
|
||||||
clock::time_point _time_start_offset;
|
clock::time_point _time_start_offset;
|
||||||
};
|
};
|
||||||
|
@ -143,7 +143,7 @@ bool NGCFT1::sendPKG_FT1_MESSAGE(
|
|||||||
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
|
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<LEDBAT::SeqIDType>& timeouts_set) {
|
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set) {
|
||||||
auto& tf_opt = peer.send_transfers.at(idx);
|
auto& tf_opt = peer.send_transfers.at(idx);
|
||||||
assert(tf_opt.has_value());
|
assert(tf_opt.has_value());
|
||||||
auto& tf = tf_opt.value();
|
auto& tf = tf_opt.value();
|
||||||
@ -309,7 +309,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
|
|
||||||
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
||||||
auto timeouts = peer.cca->getTimeouts();
|
auto timeouts = peer.cca->getTimeouts();
|
||||||
std::set<LEDBAT::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
||||||
|
|
||||||
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
||||||
if (peer.send_transfers.at(idx).has_value()) {
|
if (peer.send_transfers.at(idx).has_value()) {
|
||||||
@ -563,7 +563,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// send acks
|
// send acks
|
||||||
std::vector<uint16_t> ack_seq_ids(transfer.rsb.ack_seq_ids.cbegin(), transfer.rsb.ack_seq_ids.cend());
|
// reverse, last seq is most recent
|
||||||
|
std::vector<uint16_t> ack_seq_ids(transfer.rsb.ack_seq_ids.crbegin(), transfer.rsb.ack_seq_ids.crend());
|
||||||
// TODO: check if this caps at max acks
|
// TODO: check if this caps at max acks
|
||||||
if (!ack_seq_ids.empty()) {
|
if (!ack_seq_ids.empty()) {
|
||||||
// TODO: check return value
|
// TODO: check return value
|
||||||
@ -588,7 +589,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
|||||||
|
|
||||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
||||||
#if !NDEBUG
|
#if !NDEBUG
|
||||||
std::cout << "NGCFT1: FT1_DATA_ACK\n";
|
//std::cout << "NGCFT1: FT1_DATA_ACK\n";
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (!groups.count(e.group_number)) {
|
if (!groups.count(e.group_number)) {
|
||||||
@ -610,20 +611,17 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
//if ((length - curser) % sizeof(uint16_t) != 0) {
|
|
||||||
//fprintf(stderr, "FT: data_ack with misaligned data\n");
|
|
||||||
//return;
|
|
||||||
//}
|
|
||||||
|
|
||||||
transfer.time_since_activity = 0.f;
|
transfer.time_since_activity = 0.f;
|
||||||
|
|
||||||
std::vector<LEDBAT::SeqIDType> seqs;
|
{
|
||||||
for (const auto it : e.sequence_ids) {
|
std::vector<CCAI::SeqIDType> seqs;
|
||||||
// TODO: improve this o.o
|
for (const auto it : e.sequence_ids) {
|
||||||
seqs.push_back({e.transfer_id, it});
|
// TODO: improve this o.o
|
||||||
transfer.ssb.erase(it);
|
seqs.push_back({e.transfer_id, it});
|
||||||
|
transfer.ssb.erase(it);
|
||||||
|
}
|
||||||
|
peer.cca->onAck(std::move(seqs));
|
||||||
}
|
}
|
||||||
peer.cca->onAck(seqs);
|
|
||||||
|
|
||||||
// delete if all packets acked
|
// delete if all packets acked
|
||||||
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
|
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {
|
||||||
@ -635,6 +633,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
|||||||
e.transfer_id,
|
e.transfer_id,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
// TODO: check for FINISHING state
|
||||||
peer.send_transfers[e.transfer_id].reset();
|
peer.send_transfers[e.transfer_id].reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -712,7 +711,7 @@ bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// reset cca
|
// reset cca
|
||||||
peer.cca = std::make_unique<LEDBAT>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
peer.cca = std::make_unique<CUBIC>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,9 @@
|
|||||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||||
|
|
||||||
#include <solanaceae/ngc_ext/ngcext.hpp>
|
#include <solanaceae/ngc_ext/ngcext.hpp>
|
||||||
#include "./ledbat.hpp"
|
#include "./cubic.hpp"
|
||||||
|
//#include "./flow_only.hpp"
|
||||||
|
//#include "./ledbat.hpp"
|
||||||
|
|
||||||
#include "./rcv_buf.hpp"
|
#include "./rcv_buf.hpp"
|
||||||
#include "./snd_buf.hpp"
|
#include "./snd_buf.hpp"
|
||||||
@ -139,7 +141,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
|||||||
|
|
||||||
struct Group {
|
struct Group {
|
||||||
struct Peer {
|
struct Peer {
|
||||||
std::unique_ptr<LEDBAT> cca = std::make_unique<LEDBAT>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
std::unique_ptr<CCAI> cca = std::make_unique<CUBIC>(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4
|
||||||
|
|
||||||
struct RecvTransfer {
|
struct RecvTransfer {
|
||||||
uint32_t file_kind;
|
uint32_t file_kind;
|
||||||
@ -199,7 +201,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
|||||||
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
|
||||||
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
|
||||||
|
|
||||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<LEDBAT::SeqIDType>& timeouts_set);
|
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set);
|
||||||
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
58
solanaceae/ngc_ft1_sha1/file_rw_mapped.hpp
Normal file
58
solanaceae/ngc_ft1_sha1/file_rw_mapped.hpp
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <solanaceae/message3/file.hpp>
|
||||||
|
|
||||||
|
#include "./mio.hpp"
|
||||||
|
|
||||||
|
#include <filesystem>
|
||||||
|
#include <fstream>
|
||||||
|
#include <cstring>
|
||||||
|
|
||||||
|
struct FileRWMapped : public FileI {
|
||||||
|
mio::ummap_sink _file_map;
|
||||||
|
|
||||||
|
// TODO: add truncate support?
|
||||||
|
FileRWMapped(std::string_view file_path, uint64_t file_size) {
|
||||||
|
_file_size = file_size;
|
||||||
|
|
||||||
|
if (!std::filesystem::exists(file_path)) {
|
||||||
|
std::ofstream(std::string{file_path}) << '\0'; // force create the file
|
||||||
|
}
|
||||||
|
std::filesystem::resize_file(file_path, file_size); // ensure size, usually sparse
|
||||||
|
|
||||||
|
std::error_code err;
|
||||||
|
// sink, is also read
|
||||||
|
_file_map.map(std::string{file_path}, 0, file_size, err);
|
||||||
|
|
||||||
|
if (err) {
|
||||||
|
// TODO: errro
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual ~FileRWMapped(void) override {}
|
||||||
|
|
||||||
|
bool isGood(void) override {
|
||||||
|
return _file_map.is_mapped();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<uint8_t> read(uint64_t pos, uint64_t size) override {
|
||||||
|
if (pos+size > _file_size) {
|
||||||
|
//assert(false && "read past end");
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
return {_file_map.data()+pos, _file_map.data()+(pos+size)};
|
||||||
|
}
|
||||||
|
|
||||||
|
bool write(uint64_t pos, const std::vector<uint8_t>& data) override {
|
||||||
|
if (pos+data.size() > _file_size) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::memcpy(_file_map.data()+pos, data.data(), data.size());
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
@ -31,7 +31,7 @@ std::ostream& operator<<(std::ostream& out, const SHA1Digest& v) {
|
|||||||
size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
|
size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
|
||||||
if (chunk_index+1 == chunks.size()) {
|
if (chunk_index+1 == chunks.size()) {
|
||||||
// last chunk
|
// last chunk
|
||||||
return file_size - chunk_index * chunk_size;
|
return file_size - (uint64_t(chunk_index) * uint64_t(chunk_size));
|
||||||
} else {
|
} else {
|
||||||
return chunk_size;
|
return chunk_size;
|
||||||
}
|
}
|
||||||
|
1763
solanaceae/ngc_ft1_sha1/mio.hpp
Normal file
1763
solanaceae/ngc_ft1_sha1/mio.hpp
Normal file
File diff suppressed because it is too large
Load Diff
@ -1,6 +1,5 @@
|
|||||||
#include "./sha1_ngcft1.hpp"
|
#include "./sha1_ngcft1.hpp"
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <solanaceae/toxcore/utils.hpp>
|
#include <solanaceae/toxcore/utils.hpp>
|
||||||
|
|
||||||
#include <solanaceae/contact/components.hpp>
|
#include <solanaceae/contact/components.hpp>
|
||||||
@ -9,7 +8,6 @@
|
|||||||
#include <solanaceae/tox_messages/components.hpp>
|
#include <solanaceae/tox_messages/components.hpp>
|
||||||
|
|
||||||
#include <solanaceae/message3/file_r_file.hpp>
|
#include <solanaceae/message3/file_r_file.hpp>
|
||||||
#include <solanaceae/message3/file_rw_file.hpp>
|
|
||||||
|
|
||||||
#include "./ft1_sha1_info.hpp"
|
#include "./ft1_sha1_info.hpp"
|
||||||
#include "./hash_utils.hpp"
|
#include "./hash_utils.hpp"
|
||||||
@ -18,9 +16,12 @@
|
|||||||
|
|
||||||
#include <entt/container/dense_set.hpp>
|
#include <entt/container/dense_set.hpp>
|
||||||
|
|
||||||
|
#include "./file_rw_mapped.hpp"
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <variant>
|
#include <variant>
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
|
#include <mutex>
|
||||||
#include <future>
|
#include <future>
|
||||||
|
|
||||||
namespace Message::Components {
|
namespace Message::Components {
|
||||||
@ -533,13 +534,10 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
|||||||
|
|
||||||
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
|
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
|
||||||
|
|
||||||
std::unique_ptr<FileRWFile> file_impl;
|
std::unique_ptr<FileRWMapped> file_impl;
|
||||||
const bool file_exists = std::filesystem::exists(full_file_path);
|
const bool file_exists = std::filesystem::exists(full_file_path);
|
||||||
|
|
||||||
{
|
file_impl = std::make_unique<FileRWMapped>(full_file_path, info.file_size);
|
||||||
const bool truncate = !file_exists;
|
|
||||||
file_impl = std::make_unique<FileRWFile>(full_file_path, info.file_size, truncate);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!file_impl->isGood()) {
|
if (!file_impl->isGood()) {
|
||||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
|
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
|
||||||
@ -549,6 +547,7 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
|||||||
|
|
||||||
{ // next, create chuck cache and check for existing data
|
{ // next, create chuck cache and check for existing data
|
||||||
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
|
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
|
||||||
|
auto& bytes_received = ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total;
|
||||||
cc.have_all = false;
|
cc.have_all = false;
|
||||||
cc.have_count = 0;
|
cc.have_count = 0;
|
||||||
|
|
||||||
@ -557,21 +556,33 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
|||||||
if (file_exists) {
|
if (file_exists) {
|
||||||
// iterate existing file
|
// iterate existing file
|
||||||
for (size_t i = 0; i < info.chunks.size(); i++) {
|
for (size_t i = 0; i < info.chunks.size(); i++) {
|
||||||
auto existing_data = file_impl->read(i*info.chunk_size, info.chunkSize(i));
|
const uint64_t chunk_size = info.chunkSize(i);
|
||||||
|
auto existing_data = file_impl->read(i*uint64_t(info.chunk_size), chunk_size);
|
||||||
|
assert(existing_data.size() == chunk_size);
|
||||||
|
|
||||||
// TODO: avoid copy
|
// TODO: avoid copy
|
||||||
cc.have_chunk.push_back(
|
|
||||||
SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())} == info.chunks.at(i)
|
const auto data_hash = SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())};
|
||||||
);
|
const bool data_equal = data_hash == info.chunks.at(i);
|
||||||
if (cc.have_chunk.back()) {
|
|
||||||
|
cc.have_chunk.push_back(data_equal);
|
||||||
|
|
||||||
|
if (data_equal) {
|
||||||
cc.have_count += 1;
|
cc.have_count += 1;
|
||||||
|
bytes_received += chunk_size;
|
||||||
|
//std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n";
|
||||||
|
} else {
|
||||||
|
//std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
_chunks[info.chunks[i]] = ce;
|
_chunks[info.chunks[i]] = ce;
|
||||||
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
|
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
|
||||||
}
|
}
|
||||||
|
std::cout << "preexisting " << cc.have_count << "/" << info.chunks.size() << "\n";
|
||||||
|
|
||||||
if (cc.have_count >= info.chunks.size()) {
|
if (cc.have_count >= info.chunks.size()) {
|
||||||
cc.have_all = true;
|
cc.have_all = true;
|
||||||
|
//ce.remove<Message::Components::Transfer::BytesReceived>();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (size_t i = 0; i < info.chunks.size(); i++) {
|
for (size_t i = 0; i < info.chunks.size(); i++) {
|
||||||
@ -774,6 +785,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto& tv = peer_transfers[e.transfer_id].v;
|
auto& tv = peer_transfers[e.transfer_id].v;
|
||||||
|
peer_transfers[e.transfer_id].time_since_activity = 0.f;
|
||||||
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
|
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
|
||||||
auto& info_data = std::get<ReceivingTransfer::Info>(tv).info_data;
|
auto& info_data = std::get<ReceivingTransfer::Info>(tv).info_data;
|
||||||
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
|
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
|
||||||
@ -791,7 +803,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
|||||||
|
|
||||||
// TODO: avoid temporary copy
|
// TODO: avoid temporary copy
|
||||||
// TODO: check return
|
// TODO: check return
|
||||||
file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size});
|
if (!file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size})) {
|
||||||
|
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assert(false && "unhandled case");
|
assert(false && "unhandled case");
|
||||||
@ -812,6 +826,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto& transfer = peer.at(e.transfer_id);
|
auto& transfer = peer.at(e.transfer_id);
|
||||||
|
transfer.time_since_activity = 0.f;
|
||||||
if (std::holds_alternative<SendingTransfer::Info>(transfer.v)) {
|
if (std::holds_alternative<SendingTransfer::Info>(transfer.v)) {
|
||||||
auto& info_transfer = std::get<SendingTransfer::Info>(transfer.v);
|
auto& info_transfer = std::get<SendingTransfer::Info>(transfer.v);
|
||||||
for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) {
|
for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) {
|
||||||
@ -826,7 +841,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
|||||||
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
|
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
|
||||||
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
|
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
|
||||||
// TODO: should we really use file?
|
// TODO: should we really use file?
|
||||||
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read((chunk_transfer.chunk_index * info.chunk_size) + e.data_offset, e.data_size);
|
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read((chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset, e.data_size);
|
||||||
|
|
||||||
// TODO: optimize
|
// TODO: optimize
|
||||||
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
|
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
|
||||||
@ -845,8 +860,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
|||||||
assert(false && "not implemented?");
|
assert(false && "not implemented?");
|
||||||
}
|
}
|
||||||
|
|
||||||
transfer.time_since_activity = 0.f;
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -910,7 +923,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
|
|
||||||
// HACK: only check first chunk (they *should* all be the same)
|
// HACK: only check first chunk (they *should* all be the same)
|
||||||
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.front();
|
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.front();
|
||||||
const auto offset_into_file = chunk_index * info.chunk_size;
|
const uint64_t offset_into_file = chunk_index * uint64_t(info.chunk_size);
|
||||||
|
|
||||||
assert(chunk_index < info.chunks.size());
|
assert(chunk_index < info.chunks.size());
|
||||||
const auto chunk_size = info.chunkSize(chunk_index);
|
const auto chunk_size = info.chunkSize(chunk_index);
|
||||||
@ -923,12 +936,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
if (info.chunks.at(chunk_index) == got_hash) {
|
if (info.chunks.at(chunk_index) == got_hash) {
|
||||||
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
|
||||||
|
|
||||||
// remove from requested
|
|
||||||
// TODO: remove at init and track running transfers differently
|
|
||||||
for (const auto it : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
|
||||||
ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!cc.have_all) {
|
if (!cc.have_all) {
|
||||||
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||||
if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) {
|
if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) {
|
||||||
@ -943,6 +950,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
cc.have_all = true;
|
cc.have_all = true;
|
||||||
cc.have_chunk.clear(); // not wasting memory
|
cc.have_chunk.clear(); // not wasting memory
|
||||||
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
|
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
|
||||||
|
|
||||||
|
// HACK: remap file, to clear ram
|
||||||
|
|
||||||
|
// TODO: error checking
|
||||||
|
ce.get<Message::Components::Transfer::File>() = std::make_unique<FileRWMapped>(
|
||||||
|
ce.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
|
||||||
|
info.file_size
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// good chunk
|
// good chunk
|
||||||
@ -955,7 +970,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// bad chunk
|
// bad chunk
|
||||||
// TODO: requeue?
|
std::cout << "SHA1_NGCFT1: got BAD chunk from " << e.group_number << ":" << e.peer_number << " [" << info.chunks.at(chunk_index) << "] ; instead got [" << SHA1Digest{got_hash} << "]\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove from requested
|
||||||
|
// TODO: remove at init and track running transfers differently
|
||||||
|
for (const auto it : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
|
||||||
|
ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateMessages(ce); // mostly for received bytes
|
updateMessages(ce); // mostly for received bytes
|
||||||
|
Reference in New Issue
Block a user