23 Commits

Author SHA1 Message Date
d88c73c761 accounted 2024-10-31 12:35:32 +01:00
2a0350a564 minor tweaks and fixes
especially preventing a stall on some packetloss scenarios
2024-10-31 11:39:16 +01:00
ee593536a2 boilerplate for hs2 2024-10-30 17:12:05 +01:00
6f2fa60394 handle init2 in ft1 (hacky) 2024-10-30 11:25:33 +01:00
96041fbcec add ft1_init2 and ft1_init_ack v3 2024-10-30 11:08:46 +01:00
b4eaf86ed1 dynamically choose chunk size 2024-10-28 23:23:05 +01:00
c7485c4577 use sr 2024-10-24 14:00:16 +02:00
51396314d1 big fix for not using the found free slot index!!
minor logging and tweaking changes
2024-10-24 11:50:03 +02:00
4aab6e489d fix uint64 cast to size_t 2024-10-23 14:11:26 +02:00
5e884fd3ee dont delay recv_done and check on init info 2024-10-23 13:46:00 +02:00
9a4be575ba minor stuff and logging 2024-10-23 12:51:22 +02:00
fd094b157f more 32bit stuff 2024-10-20 18:37:01 +02:00
c3c2d0f133 fixes for 32bit 2024-10-20 16:35:55 +02:00
4360b65309 update to rmmi 2024-10-06 11:37:07 +02:00
1d7416efed fix rare assert 2024-08-08 23:52:36 +02:00
a761378dd9 add p2prng packet to ext 2024-08-07 11:07:18 +02:00
60d6f27a12 disable sending timeout assert, we believe the underlying ngcft1 2024-08-04 10:15:26 +02:00
07099e4832 tag chunkpicker for update more often 2024-08-04 10:14:59 +02:00
9e2911b36c spread chosen chunks more thinly if small files
should help (a little) if many small files with distribution
2024-08-02 13:06:55 +02:00
6da1f9afca info fixes, should investigate more 2024-07-31 18:34:14 +02:00
8bd2c925a6 fix missized local have bitset 2024-07-25 14:57:27 +02:00
da406714ff adopt to new os and message file refactor 2024-07-24 17:55:31 +02:00
16cb755191 fix dangerous unchecked file stream read 2024-07-22 19:49:02 +02:00
25 changed files with 1062 additions and 496 deletions

View File

@ -43,6 +43,21 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC
########################################
add_library(solanaceae_ngchs2
./solanaceae/ngc_hs2/ngc_hs2.hpp
./solanaceae/ngc_hs2/ngc_hs2.cpp
)
target_include_directories(solanaceae_ngchs2 PUBLIC .)
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
target_link_libraries(solanaceae_ngchs2 PUBLIC
solanaceae_ngcft1
solanaceae_tox_contacts
solanaceae_message3
solanaceae_object_store
)
########################################
add_library(solanaceae_sha1_ngcft1
# hacky deps
./solanaceae/ngc_ft1_sha1/mio.hpp

View File

@ -3,9 +3,11 @@
#include <iostream>
#include <cassert>
NGCEXTEventProvider::NGCEXTEventProvider(ToxI& t, ToxEventProviderI& tep) : _t(t), _tep(tep) {
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PACKET);
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET);
NGCEXTEventProvider::NGCEXTEventProvider(ToxI& t, ToxEventProviderI& tep) : _t(t), _tep(tep), _tep_sr(_tep.newSubRef(this)) {
_tep_sr
.subscribe(Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PACKET)
.subscribe(Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET)
;
}
#define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error; }
@ -78,7 +80,7 @@ bool NGCEXTEventProvider::parse_ft1_init(
e.file_size = 0u;
_DATA_HAVE(sizeof(e.file_size), std::cerr << "NGCEXT: packet too small, missing file_size\n"; return false)
for (size_t i = 0; i < sizeof(e.file_size); i++, curser++) {
e.file_size |= size_t(data[curser]) << (i*8);
e.file_size |= uint64_t(data[curser]) << (i*8);
}
// - 1 byte (temporary_file_tf_id)
@ -121,6 +123,83 @@ bool NGCEXTEventProvider::parse_ft1_init_ack(
);
}
bool NGCEXTEventProvider::parse_ft1_init_ack_v2(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
) {
if (!_private) {
std::cerr << "NGCEXT: ft1_init_ack_v2 cant be public\n";
return false;
}
Events::NGCEXT_ft1_init_ack e;
e.group_number = group_number;
e.peer_number = peer_number;
size_t curser = 0;
// - 1 byte (temporary_file_tf_id)
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
e.transfer_id = data[curser++];
// - 2 byte (max_lossy_data_size)
if ((data_size - curser) >= sizeof(e.max_lossy_data_size)) {
e.max_lossy_data_size = 0;
for (size_t i = 0; i < sizeof(e.max_lossy_data_size); i++, curser++) {
e.max_lossy_data_size |= uint16_t(data[curser]) << (i*8);
}
} else {
e.max_lossy_data_size = 500-4; // default
}
return dispatch(
NGCEXT_Event::FT1_INIT_ACK,
e
);
}
bool NGCEXTEventProvider::parse_ft1_init_ack_v3(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
) {
if (!_private) {
std::cerr << "NGCEXT: ft1_init_ack_v3 cant be public\n";
return false;
}
Events::NGCEXT_ft1_init_ack e;
e.group_number = group_number;
e.peer_number = peer_number;
size_t curser = 0;
// - 1 byte (temporary_file_tf_id)
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
e.transfer_id = data[curser++];
// - 2 byte (max_lossy_data_size)
if ((data_size - curser) >= sizeof(e.max_lossy_data_size)) {
e.max_lossy_data_size = 0;
for (size_t i = 0; i < sizeof(e.max_lossy_data_size); i++, curser++) {
e.max_lossy_data_size |= uint16_t(data[curser]) << (i*8);
}
} else {
e.max_lossy_data_size = 500-4; // default
}
// - 1 byte (feature_flags)
if ((data_size - curser) >= sizeof(e.feature_flags)) {
e.feature_flags = data[curser++];
} else {
e.feature_flags = 0x00; // default
}
return dispatch(
NGCEXT_Event::FT1_INIT_ACK,
e
);
}
bool NGCEXTEventProvider::parse_ft1_data(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
@ -227,41 +306,6 @@ bool NGCEXTEventProvider::parse_ft1_message(
);
}
bool NGCEXTEventProvider::parse_ft1_init_ack_v2(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
) {
if (!_private) {
std::cerr << "NGCEXT: ft1_init_ack_v2 cant be public\n";
return false;
}
Events::NGCEXT_ft1_init_ack e;
e.group_number = group_number;
e.peer_number = peer_number;
size_t curser = 0;
// - 1 byte (temporary_file_tf_id)
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
e.transfer_id = data[curser++];
// - 2 byte (max_lossy_data_size)
if ((data_size - curser) >= sizeof(e.max_lossy_data_size)) {
e.max_lossy_data_size = 0;
for (size_t i = 0; i < sizeof(e.max_lossy_data_size); i++, curser++) {
e.max_lossy_data_size |= uint16_t(data[curser]) << (i*8);
}
} else {
e.max_lossy_data_size = 500-4; // default
}
return dispatch(
NGCEXT_Event::FT1_INIT_ACK,
e
);
}
bool NGCEXTEventProvider::parse_ft1_have(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
@ -399,6 +443,52 @@ bool NGCEXTEventProvider::parse_ft1_have_all(
);
}
bool NGCEXTEventProvider::parse_ft1_init2(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
) {
if (!_private) {
std::cerr << "NGCEXT: ft1_init2 cant be public\n";
return false;
}
Events::NGCEXT_ft1_init2 e;
e.group_number = group_number;
e.peer_number = peer_number;
size_t curser = 0;
// - 4 byte (file_kind)
e.file_kind = 0u;
_DATA_HAVE(sizeof(e.file_kind), std::cerr << "NGCEXT: packet too small, missing file_kind\n"; return false)
for (size_t i = 0; i < sizeof(e.file_kind); i++, curser++) {
e.file_kind |= uint32_t(data[curser]) << (i*8);
}
// - 8 bytes (data size)
e.file_size = 0u;
_DATA_HAVE(sizeof(e.file_size), std::cerr << "NGCEXT: packet too small, missing file_size\n"; return false)
for (size_t i = 0; i < sizeof(e.file_size); i++, curser++) {
e.file_size |= uint64_t(data[curser]) << (i*8);
}
// - 1 byte (temporary_file_tf_id)
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
e.transfer_id = data[curser++];
// - 1 byte feature flags
_DATA_HAVE(sizeof(e.feature_flags), std::cerr << "NGCEXT: packet too small, missing feature_flags\n"; return false)
e.feature_flags = data[curser++];
// - X bytes (file_kind dependent id, differnt sizes)
e.file_id = {data+curser, data+curser+(data_size-curser)};
return dispatch(
NGCEXT_Event::FT1_INIT2,
e
);
}
bool NGCEXTEventProvider::parse_pc1_announce(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
@ -443,7 +533,8 @@ bool NGCEXTEventProvider::handlePacket(
return parse_ft1_init(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_INIT_ACK:
//return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private);
return parse_ft1_init_ack_v2(group_number, peer_number, data+1, data_size-1, _private);
//return parse_ft1_init_ack_v2(group_number, peer_number, data+1, data_size-1, _private);
return parse_ft1_init_ack_v3(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_DATA:
return parse_ft1_data(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_DATA_ACK:
@ -456,6 +547,8 @@ bool NGCEXTEventProvider::handlePacket(
return parse_ft1_bitset(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_HAVE_ALL:
return parse_ft1_have_all(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_INIT2:
return parse_ft1_init2(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::PC1_ANNOUNCE:
return parse_pc1_announce(group_number, peer_number, data+1, data_size-1, _private);
default:
@ -699,6 +792,39 @@ bool NGCEXTEventProvider::send_ft1_have_all(
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCEXTEventProvider::send_ft1_init2(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
uint64_t file_size,
uint8_t transfer_id,
uint8_t feature_flags,
const uint8_t* file_id, size_t file_id_size
) {
// - 1 byte packet id
// - 4 byte (file_kind)
// - 8 bytes (data size)
// - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts)
// - 1 byte (feature_flags)
// - X bytes (file_kind dependent id, differnt sizes)
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT2));
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
for (size_t i = 0; i < sizeof(file_size); i++) {
pkg.push_back((file_size>>(i*8)) & 0xff);
}
pkg.push_back(transfer_id);
pkg.push_back(feature_flags);
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
static std::vector<uint8_t> build_pc1_announce(const uint8_t* id_data, size_t id_size) {
// - 1 byte packet id
// - X bytes (id, differnt sizes)

View File

@ -30,6 +30,7 @@ namespace Events {
uint32_t peer_number;
// respond to a request with 0 or more message ids, sorted by newest first
// - peer_key bytes (the msg_ids are from)
ToxKey peer_key;
@ -47,6 +48,7 @@ namespace Events {
uint32_t peer_number;
// request the other side to initiate a FT
// - 4 byte (file_kind)
uint32_t file_kind;
@ -54,11 +56,13 @@ namespace Events {
std::vector<uint8_t> file_id;
};
// DEPRECATED: use FT1_INIT2 instead
struct NGCEXT_ft1_init {
uint32_t group_number;
uint32_t peer_number;
// tell the other side you want to start a FT
// - 4 byte (file_kind)
uint32_t file_kind;
@ -81,6 +85,11 @@ namespace Events {
// - 2 byte (self_max_lossy_data_size)
uint16_t max_lossy_data_size;
// - 1 byte feature flags
// - 0x01 advertised zstd compression
// - 0x02
uint8_t feature_flags;
};
struct NGCEXT_ft1_data {
@ -88,6 +97,7 @@ namespace Events {
uint32_t peer_number;
// data fragment
// - 1 byte (temporary_file_tf_id)
uint8_t transfer_id;
@ -172,6 +182,30 @@ namespace Events {
std::vector<uint8_t> file_id;
};
struct NGCEXT_ft1_init2 {
uint32_t group_number;
uint32_t peer_number;
// tell the other side you want to start a FT
// - 4 byte (file_kind)
uint32_t file_kind;
// - 8 bytes (data size)
uint64_t file_size;
// - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts)
uint8_t transfer_id;
// - 1 byte feature flags
// - 0x01 advertise zstd compression
// - 0x02
uint8_t feature_flags;
// - X bytes (file_kind dependent id, differnt sizes)
std::vector<uint8_t> file_id;
};
struct NGCEXT_pc1_announce {
uint32_t group_number;
uint32_t peer_number;
@ -206,6 +240,7 @@ enum class NGCEXT_Event : uint8_t {
// tell the other side you want to start a FT
// TODO: might use id layer instead. with it, it would look similar to friends_ft
// DEPRECATED: use FT1_INIT2 instead
// - 4 byte (file_kind)
// - 8 bytes (data size, can be 0 if unknown, BUT files have to be atleast 1 byte)
// - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts)
@ -215,7 +250,8 @@ enum class NGCEXT_Event : uint8_t {
// acknowlage init (like an accept)
// like tox ft control continue
// - 1 byte (transfer_id)
// - 2 byte (self_max_lossy_data_size) (optional since v2)
// - 2 byte (self_max_lossy_data_size) (optimal since v2)
// - 1 byte feature flags (optimal since v3, requires prev)
FT1_INIT_ACK,
// TODO: init deny, speed up non acceptance
@ -273,8 +309,18 @@ enum class NGCEXT_Event : uint8_t {
// - X bytes (file_kind dependent id, differnt sizes)
FT1_HAVE_ALL,
// tell the other side you want to start a FT
// update: added feature flags (compression)
// - 4 byte (file_kind)
// - 8 bytes (data size, can be 0 if unknown, BUT files have to be atleast 1 byte)
// - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts)
// - 1 byte feature flags
// - X bytes (file_kind dependent id, differnt sizes)
FT1_INIT2,
// TODO: FT1_IDONTHAVE, tell a peer you no longer have said chunk
// TODO: FT1_REJECT, tell a peer you wont fulfil the request
// TODO: FT1_CANCEL, tell a peer you stop the transfer
// tell another peer that you are participating in X
// you can reply with PC1_ANNOUNCE, to let the other side know, you too are participating in X
@ -283,6 +329,9 @@ enum class NGCEXT_Event : uint8_t {
// - x bytes (id, different sizes)
PC1_ANNOUNCE = 0x80 | 32u,
// uses sub splitting
P2PRNG = 0x80 | 38u,
MAX
};
@ -299,6 +348,7 @@ struct NGCEXTEventI {
virtual bool onEvent(const Events::NGCEXT_ft1_have&) { return false; }
virtual bool onEvent(const Events::NGCEXT_ft1_bitset&) { return false; }
virtual bool onEvent(const Events::NGCEXT_ft1_have_all&) { return false; }
virtual bool onEvent(const Events::NGCEXT_ft1_init2&) { return false; }
virtual bool onEvent(const Events::NGCEXT_pc1_announce&) { return false; }
};
@ -307,6 +357,7 @@ using NGCEXTEventProviderI = EventProviderI<NGCEXTEventI>;
class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
ToxI& _t;
ToxEventProviderI& _tep;
ToxEventProviderI::SubscriptionReference _tep_sr;
public:
NGCEXTEventProvider(ToxI& t, ToxEventProviderI& tep);
@ -342,6 +393,18 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
bool _private
);
bool parse_ft1_init_ack_v2(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
);
bool parse_ft1_init_ack_v3(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
);
bool parse_ft1_data(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
@ -360,12 +423,6 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
bool _private
);
bool parse_ft1_init_ack_v2(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
);
bool parse_ft1_have(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
@ -384,6 +441,12 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
bool _private
);
bool parse_ft1_init2(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
);
bool parse_pc1_announce(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
@ -460,6 +523,15 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
const uint8_t* file_id, size_t file_id_size
);
bool send_ft1_init2(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
uint64_t file_size,
uint8_t transfer_id,
uint8_t feature_flags,
const uint8_t* file_id, size_t file_id_size
);
bool send_pc1_announce(
uint32_t group_number, uint32_t peer_number,
const uint8_t* id_data, size_t id_size

View File

@ -61,6 +61,10 @@ struct CCAI {
// returns -1 if not implemented, can return 0
virtual int64_t inFlightBytes(void) const { return -1; }
// returns -1 if not implemented, can return 0
// excluded timed out packets (not those currently resent)
virtual int64_t inFlightBytesAccounted(void) const { return -1; }
public: // callbacks
// data size is without overhead
virtual void onSent(SeqIDType seq, size_t data_size) = 0;

View File

@ -7,7 +7,9 @@ void CUBIC::updateReductionTimer(float time_delta) {
const auto now {getTimeNow()};
// only keep updating while the cca interaction is not too long ago
if (now - _time_point_last_update <= getCurrentDelay()*4.f) {
// or simply when there are packets in flight
// (you need space to resend timedout, which still use up pipe space)
if (!_in_flight.empty() || now - _time_point_last_update <= getCurrentDelay()*4.f) {
_time_since_reduction += time_delta;
}
}
@ -86,12 +88,15 @@ int64_t CUBIC::canSend(float time_delta) {
updateReductionTimer(time_delta);
if (fspace_pkgs == 0u) {
std::cerr << "CUBIC: flow said 0\n";
return 0u;
}
const auto window = getCWnD();
int64_t cspace_bytes = window - _in_flight_bytes;
//int64_t cspace_bytes = window - _in_flight_bytes;
int64_t cspace_bytes = window - _in_flight_bytes_accounted;
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
//std::cerr << "CUBIC: cspace < seg size\n";
return 0u;
}

View File

@ -29,6 +29,25 @@ void FlowOnly::updateWindow(void) {
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
}
void FlowOnly::updateAccounted(void) {
int64_t size_timedout {0};
{ // can be expensive
// code see getTimeouts()
// after 3 rtt delay, we trigger timeout
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
for (const auto& [seq, time_stamp, size, _] : _in_flight) {
if (now_adjusted > time_stamp) {
//list.push_back(seq);
size_timedout += size;
}
}
}
_in_flight_bytes_accounted = _in_flight_bytes - size_timedout;
}
void FlowOnly::updateCongestion(void) {
updateWindow();
const auto tmp_window = getWindow();
@ -65,12 +84,15 @@ float FlowOnly::getWindow(void) const {
int64_t FlowOnly::canSend(float time_delta) {
if (_in_flight.empty()) {
assert(_in_flight_bytes == 0);
return MAXIMUM_SEGMENT_DATA_SIZE;
// TODO: should we really exit early here??
return 2*MAXIMUM_SEGMENT_DATA_SIZE;
}
updateWindow();
updateAccounted();
int64_t fspace = _fwnd - _in_flight_bytes;
//int64_t fspace = _fwnd - _in_flight_bytes;
int64_t fspace = _fwnd - _in_flight_bytes_accounted;
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
return 0u;
}
@ -106,6 +128,10 @@ int64_t FlowOnly::inFlightBytes(void) const {
return _in_flight_bytes;
}
int64_t FlowOnly::inFlightBytesAccounted(void) const {
return _in_flight_bytes_accounted;
}
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
if constexpr (true) {
size_t sum {0u};

View File

@ -32,6 +32,7 @@ struct FlowOnly : public CCAI {
};
std::vector<FlyingBunch> _in_flight;
int64_t _in_flight_bytes {0};
int64_t _in_flight_bytes_accounted {0};
int32_t _consecutive_events {0};
@ -58,6 +59,8 @@ struct FlowOnly : public CCAI {
void updateWindow(void);
void updateAccounted(void);
virtual void onCongestion(void) {};
// internal logic, calls the onCongestion() event
@ -77,6 +80,7 @@ struct FlowOnly : public CCAI {
int64_t inFlightCount(void) const override;
int64_t inFlightBytes(void) const override;
int64_t inFlightBytesAccounted(void) const override;
public: // callbacks
// data size is without overhead

View File

@ -40,22 +40,24 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
} else {
// timed out, resend
std::cerr << "NGCFT1 warning: ft init timed out, resending\n";
//sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
_neep.send_ft1_init(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
tf.inits_sent++;
tf.time_since_activity = 0.f;
}
}
//break;
return;
break;
case State::FINISHING: // we still have unacked packets
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
if (timeouts_set.count({idx, id})) {
if (can_packet_size >= data.size()) {
_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
} else {
std::cerr << "NGCFT1 warning: no space to resend timedout\n";
}
}
});
if (tf.time_since_activity >= sending_give_up_after) {
@ -122,7 +124,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
size_t chunk_size = std::min<size_t>({
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
static_cast<size_t>(can_packet_size),
tf.file_size - tf.file_size_current
static_cast<size_t>(tf.file_size - tf.file_size_current),
});
if (chunk_size == 0) {
tf.state = State::FINISHING;
@ -139,7 +141,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
group_number, peer_number,
static_cast<uint8_t>(idx),
tf.file_size_current,
new_data.data(), new_data.size(),
new_data.data(), static_cast<uint32_t>(new_data.size()),
}
);
@ -198,7 +200,6 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
}
}
//for (auto& transfer_opt : peer.recv_transfers) {
for (size_t idx = 0; idx < peer.recv_transfers.size(); idx++) {
if (!peer.recv_transfers.at(idx).has_value()) {
continue;
@ -210,13 +211,13 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
if (transfer.state == Group::Peer::RecvTransfer::State::FINISHING) {
transfer.finishing_timer -= time_delta;
if (transfer.finishing_timer <= 0.f) {
dispatch(
NGCFT1_Event::recv_done,
Events::NGCFT1_recv_done{
group_number, peer_number,
uint8_t(idx)
}
);
//dispatch(
// NGCFT1_Event::recv_done,
// Events::NGCFT1_recv_done{
// group_number, peer_number,
// uint8_t(idx)
// }
//);
peer.recv_transfers.at(idx).reset();
}
@ -251,16 +252,18 @@ NGCFT1::NGCFT1(
ToxI& t,
ToxEventProviderI& tep,
NGCEXTEventProvider& neep
) : _t(t), _tep(tep), _neep(neep)
) : _t(t), _tep(tep), _tep_sr(_tep.newSubRef(this)), _neep(neep), _neep_sr(_neep.newSubRef(this))
{
_neep.subscribe(this, NGCEXT_Event::FT1_REQUEST);
_neep.subscribe(this, NGCEXT_Event::FT1_INIT);
_neep.subscribe(this, NGCEXT_Event::FT1_INIT_ACK);
_neep.subscribe(this, NGCEXT_Event::FT1_DATA);
_neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK);
_neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE);
_neep_sr
.subscribe(NGCEXT_Event::FT1_REQUEST)
.subscribe(NGCEXT_Event::FT1_INIT)
.subscribe(NGCEXT_Event::FT1_INIT_ACK)
.subscribe(NGCEXT_Event::FT1_DATA)
.subscribe(NGCEXT_Event::FT1_DATA_ACK)
.subscribe(NGCEXT_Event::FT1_MESSAGE)
;
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT);
_tep_sr.subscribe(Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT);
}
float NGCFT1::iterate(float time_delta) {
@ -306,7 +309,7 @@ float NGCFT1::iterate(float time_delta) {
void NGCFT1::NGC_FT1_send_request_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
const uint8_t* file_id, uint32_t file_id_size
) {
// TODO: error check
_neep.send_ft1_request(group_number, peer_number, file_kind, file_id, file_id_size);
@ -315,9 +318,10 @@ void NGCFT1::NGC_FT1_send_request_private(
bool NGCFT1::NGC_FT1_send_init_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size,
size_t file_size,
uint8_t* transfer_id
const uint8_t* file_id, uint32_t file_id_size,
uint64_t file_size,
uint8_t* transfer_id,
bool can_compress
) {
if (std::get<0>(_t.toxGroupPeerGetConnectionStatus(group_number, peer_number)).value_or(TOX_CONNECTION_NONE) == TOX_CONNECTION_NONE) {
std::cerr << "NGCFT1 error: cant init ft, peer offline\n";
@ -347,6 +351,8 @@ bool NGCFT1::NGC_FT1_send_init_private(
std::cerr << "NGCFT1 error: cant init ft, no free transfer slot\n";
return false;
}
idx = i;
}
// TODO: check return value
@ -374,7 +380,7 @@ bool NGCFT1::NGC_FT1_send_message_public(
uint32_t group_number,
uint32_t& message_id,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
const uint8_t* file_id, uint32_t file_id_size
) {
// create msg_id
message_id = randombytes_random();
@ -441,7 +447,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
Events::NGCFT1_recv_request{
e.group_number, e.peer_number,
static_cast<NGCFT1_file_kind>(e.file_kind),
e.file_id.data(), e.file_id.size()
e.file_id.data(), static_cast<uint32_t>(e.file_id.size())
}
);
}
@ -450,14 +456,14 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
//#if !NDEBUG
std::cout << "NGCFT1: got FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n";
//#endif
#if 0
bool accept = false;
dispatch(
NGCFT1_Event::recv_init,
Events::NGCFT1_recv_init{
e.group_number, e.peer_number,
static_cast<NGCFT1_file_kind>(e.file_kind),
e.file_id.data(), e.file_id.size(),
e.file_id.data(), static_cast<uint32_t>(e.file_id.size()),
e.transfer_id,
e.file_size,
accept
@ -486,8 +492,19 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
0u,
{} // rsb
};
return true;
#else
// HACK: simply forward to init2 hanlder
return onEvent(Events::NGCEXT_ft1_init2{
e.group_number,
e.peer_number,
e.file_kind,
e.file_size,
e.transfer_id,
0x00, // non set
e.file_id, // sadly a copy, wont matter in the future
});
#endif
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
@ -516,6 +533,11 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
return true;
}
if (e.max_lossy_data_size < 16) {
std::cerr << "NGCFT1 error: init_ack max_lossy_data_size is less than 16 bytes\n";
return true;
}
// negotiated packet_data_size
const auto negotiated_packet_data_size = std::min<uint32_t>(e.max_lossy_data_size, _t.toxGroupMaxCustomLossyPacketLength()-4);
// TODO: reset cca with new pkg size
@ -532,6 +554,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
//peer.cca = std::make_unique<LEDBAT>(peer.max_packet_data_size);
//peer.cca = std::make_unique<FlowOnly>(peer.max_packet_data_size);
//peer.cca->max_byterate_allowed = 1.f *1024*1024;
} else {
std::cerr << "NGCFT1: reusing cca. rtt:" << peer.cca->getCurrentDelay() << " w:" << peer.cca->getWindow() << " ifc:" << peer.cca->inFlightCount() << "\n";
}
// iterate will now call NGC_FT1_send_data_cb
@ -543,7 +567,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
#if !NDEBUG
//std::cout << "NGCFT1: got FT1_DATA\n";
//std::cout << "NGCFT1: got FT1_DATA " << e.sequence_id << "\n";
#endif
if (e.data.empty()) {
@ -578,7 +602,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
e.group_number, e.peer_number,
e.transfer_id,
transfer.file_size_current,
data.data(), data.size()
data.data(), static_cast<uint32_t>(data.size())
}
);
@ -601,7 +625,15 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
// TODO: keep around for remote timeout + delay + offset, so we can be sure all acks where received
// or implement a dedicated finished that needs to be acked
transfer.finishing_timer = 0.5f; // TODO: we are receiving, we dont know delay
transfer.finishing_timer = 0.75f; // TODO: we are receiving, we dont know delay
dispatch(
NGCFT1_Event::recv_done,
Events::NGCFT1_recv_done{
e.group_number, e.peer_number,
e.transfer_id
}
);
}
return true;
@ -620,7 +652,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
Group::Peer& peer = groups[e.group_number].peers[e.peer_number];
if (!peer.send_transfers[e.transfer_id].has_value()) {
// we delete directly, packets might still be in flight (in practice they are when ce)
//std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
// update: we no longer delete directly, but its kinda hacky
std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n";
return true;
}
@ -672,11 +705,55 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_message& e) {
e.group_number, e.peer_number,
e.message_id,
static_cast<NGCFT1_file_kind>(e.file_kind),
e.file_id.data(), e.file_id.size()
e.file_id.data(), static_cast<uint32_t>(e.file_id.size())
}
);
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init2& e) {
//#if !NDEBUG
std::cout << "NGCFT1: got FT1_INIT2 fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " ff:" << int(e.feature_flags) << " [" << bin2hex(e.file_id) << "]\n";
//#endif
bool accept = false;
dispatch(
NGCFT1_Event::recv_init,
Events::NGCFT1_recv_init{
e.group_number, e.peer_number,
static_cast<NGCFT1_file_kind>(e.file_kind),
e.file_id.data(), static_cast<uint32_t>(e.file_id.size()),
e.transfer_id,
e.file_size,
accept
}
);
if (!accept) {
std::cout << "NGCFT1: rejected init2\n";
return true; // return true?
}
_neep.send_ft1_init_ack(e.group_number, e.peer_number, e.transfer_id);
std::cout << "NGCFT1: accepted init2\n";
auto& peer = groups[e.group_number].peers[e.peer_number];
if (peer.recv_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << int(e.transfer_id) << ", other peer started new transfer on preexising\n";
}
peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{
e.file_kind,
e.file_id,
Group::Peer::RecvTransfer::State::INITED,
e.file_size,
0u,
{} // rsb
};
return true;
}
bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
const auto group_number = tox_event_group_peer_exit_get_group_number(e);
const auto peer_number = tox_event_group_peer_exit_get_peer_id(e);

View File

@ -29,7 +29,7 @@ namespace Events {
NGCFT1_file_kind file_kind;
const uint8_t* file_id;
size_t file_id_size;
uint32_t file_id_size;
};
struct NGCFT1_recv_init {
@ -39,10 +39,10 @@ namespace Events {
NGCFT1_file_kind file_kind;
const uint8_t* file_id;
size_t file_id_size;
uint32_t file_id_size;
const uint8_t transfer_id;
const size_t file_size;
const uint64_t file_size;
// return true to accept, false to deny
bool& accept;
@ -54,9 +54,9 @@ namespace Events {
uint8_t transfer_id;
size_t data_offset;
uint64_t data_offset;
const uint8_t* data;
size_t data_size;
uint32_t data_size;
};
// request to fill data_size bytes into data
@ -66,9 +66,9 @@ namespace Events {
uint8_t transfer_id;
size_t data_offset;
uint64_t data_offset;
uint8_t* data;
size_t data_size;
uint32_t data_size;
};
struct NGCFT1_recv_done {
@ -96,7 +96,7 @@ namespace Events {
NGCFT1_file_kind file_kind;
const uint8_t* file_id;
size_t file_id_size;
uint32_t file_id_size;
};
} // Events
@ -132,7 +132,9 @@ using NGCFT1EventProviderI = EventProviderI<NGCFT1EventI>;
class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProviderI {
ToxI& _t;
ToxEventProviderI& _tep;
ToxEventProviderI::SubscriptionReference _tep_sr;
NGCEXTEventProvider& _neep; // not the interface?
NGCEXTEventProvider::SubscriptionReference _neep_sr;
std::default_random_engine _rng{std::random_device{}()};
@ -141,7 +143,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
// TODO: config
size_t acks_per_packet {3u}; // 3
float init_retry_timeout_after {4.f};
float sending_give_up_after {15.f}; // 30sec (per active transfer)
float sending_give_up_after {10.f}; // sec (per active transfer)
struct Group {
struct Peer {
@ -159,8 +161,8 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
FINISHING, // got all the data, but we wait for 2*delay, since its likely there is data still arriving
} state;
size_t file_size {0};
size_t file_size_current {0};
uint64_t file_size {0};
uint64_t file_size_current {0};
// if state FINISHING and it reaches 0, delete
float finishing_timer {0.f};
@ -188,8 +190,8 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
size_t inits_sent {1}; // is sent when creating
float time_since_activity {0.f};
size_t file_size {0};
size_t file_size_current {0};
uint64_t file_size {0};
uint64_t file_size_current {0};
// sequence array
// list of sent but not acked seq_ids
@ -224,16 +226,17 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
void NGC_FT1_send_request_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
const uint8_t* file_id, uint32_t file_id_size
);
// public does not make sense here
bool NGC_FT1_send_init_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size,
size_t file_size,
uint8_t* transfer_id
const uint8_t* file_id, uint32_t file_id_size,
uint64_t file_size,
uint8_t* transfer_id,
bool can_compress = false // set this if you know the data is compressable (eg text)
);
// sends the message and fills in message_id
@ -241,7 +244,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
uint32_t group_number,
uint32_t& message_id,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
const uint8_t* file_id, uint32_t file_id_size
);
public: // cca stuff
@ -268,6 +271,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
bool onEvent(const Events::NGCEXT_ft1_data&) override;
bool onEvent(const Events::NGCEXT_ft1_data_ack&) override;
bool onEvent(const Events::NGCEXT_ft1_message&) override;
bool onEvent(const Events::NGCEXT_ft1_init2&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;

View File

@ -72,5 +72,23 @@ enum class NGCFT1_file_kind : uint32_t {
// id: sha256
// always of size 16KiB, except if last piece in file
TORRENT_V2_PIECE,
// https://gist.github.com/Green-Sky/440cd9817a7114786850eb4c62dc57c3
// id: ts start, ts end
// content:
// - ts start (do we need this? when this is part of the id?)
// - ts end (same)
// - list size
// - ppk
// - mid
// - ts
HS2_INFO_RANGE_TIME = 0x00000f00,
// TODO: half open ranges
// TODO: id based
// TODO: ppk based?
// id: ppk, mid, ts
HS2_SINGLE_MESSAGE,
// TODO: message pack
};

View File

@ -1,6 +1,7 @@
#include "./sha1_mapped_filesystem.hpp"
#include <solanaceae/object_store/meta_components.hpp>
#include <solanaceae/object_store/meta_components_file.hpp>
#include "../file_constructor.hpp"
#include "../ft1_sha1_info.hpp"
@ -85,6 +86,15 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
// build info
sha1_info.file_name = file_name_;
sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details
sha1_info.chunk_size = chunkSizeFromFileSize(sha1_info.file_size);
{
// TOOD: remove
const uint32_t cs_low {32*1024};
const uint32_t cs_high {4*1024*1024};
assert(sha1_info.chunk_size >= cs_low);
assert(sha1_info.chunk_size <= cs_high);
}
{ // build chunks
// HACK: load file fully
@ -138,10 +148,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
o = {_os.registry(), it_ov};
}
}
//if (self->info_to_content.count(sha1_info_hash)) {
if (static_cast<bool>(o)) {
//ce = self->info_to_content.at(sha1_info_hash);
// TODO: check if content is incomplete and use file instead
if (!o.all_of<Components::FT1InfoSHA1>()) {
o.emplace<Components::FT1InfoSHA1>(sha1_info);
@ -153,32 +160,9 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
// hash has to be set already
// Components::FT1InfoSHA1Hash
{ // lookup tables and have
auto& cc = o.get_or_emplace<Components::FT1ChunkSHA1Cache>();
cc.have_all = true;
// skip have vec, since all
//cc.have_chunk
cc.have_count = sha1_info.chunks.size(); // need?
//self->_info_to_content[sha1_info_hash] = ce;
cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
//self->_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
}
}
{ // file info
// TODO: not overwrite fi? since same?
auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
file_info.total_size = file_impl->_file_size;
o.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
}
// hmmm
o.remove<Message::Components::Transfer::TagPaused>();
// TODO: we need a replacement for this
o.remove<ObjComp::Ephemeral::File::TagTransferPaused>();
// we dont want the info anymore
o.remove<Components::ReRequestInfoTimer>();
@ -188,32 +172,33 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
o.emplace<Components::FT1InfoSHA1>(sha1_info);
o.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
{ // lookup tables and have
auto& cc = o.emplace<Components::FT1ChunkSHA1Cache>();
cc.have_all = true;
// skip have vec, since all
cc.have_count = sha1_info.chunks.size(); // need?
}
cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
}
}
{ // lookup tables and have
auto& cc = o.get_or_emplace<Components::FT1ChunkSHA1Cache>();
// skip have vec, since all
cc.have_count = sha1_info.chunks.size(); // need?
{ // file info
auto& file_info = o.emplace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
file_info.total_size = file_impl->_file_size;
o.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
}
}
o.emplace_or_replace<Message::Components::Transfer::File>(std::move(file_impl));
o.emplace_or_replace<ObjComp::F::TagLocalHaveAll>();
o.remove<ObjComp::F::LocalHaveBitset>();
// TODO: replace with transfers stats
if (!o.all_of<Message::Components::Transfer::BytesSent>()) {
o.emplace<Message::Components::Transfer::BytesSent>(0u);
{ // file info
// TODO: not overwrite fi? since same?
o.emplace_or_replace<ObjComp::F::SingleInfo>(file_name_, file_impl->_file_size);
o.emplace_or_replace<ObjComp::F::SingleInfoLocal>(file_path_);
o.emplace_or_replace<ObjComp::Ephemeral::FilePath>(file_path_); // ?
}
o.emplace_or_replace<Components::FT1File2>(std::move(file_impl));
if (!o.all_of<ObjComp::Ephemeral::File::TransferStats>()) {
o.emplace<ObjComp::Ephemeral::File::TransferStats>();
}
cb(o);
@ -226,23 +211,33 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
}
std::unique_ptr<File2I> SHA1MappedFilesystem::file2(Object ov, FILE2_FLAGS flags) {
if (flags & FILE2_RAW) {
std::cerr << "SHA1MF error: does not support raw modes\n";
return nullptr;
}
ObjectHandle o{_os.registry(), ov};
if (!static_cast<bool>(o)) {
return nullptr;
}
if (!o.all_of<Message::Components::Transfer::FileInfoLocal>()) {
// will this do if we go and support enc?
// use ObjComp::Ephemeral::FilePath instead??
if (!o.all_of<ObjComp::F::SingleInfoLocal>()) {
return nullptr;
}
const auto& file_list = o.get<Message::Components::Transfer::FileInfoLocal>().file_list;
if (file_list.empty()) {
const auto& file_path = o.get<ObjComp::F::SingleInfoLocal>().file_path;
if (file_path.empty()) {
return nullptr;
}
auto res = construct_file2_rw_mapped(file_list.front(), -1);
// TODO: read-only one too
// since they are mapped, is this efficent to have multiple?
auto res = construct_file2_rw_mapped(file_path, -1);
if (!res || !res->isGood()) {
std::cerr << "SHA1MF error: failed constructing mapped file '" << file_path << "'\n";
return nullptr;
}

View File

@ -32,7 +32,7 @@ struct SHA1MappedFilesystem : public StorageBackendI {
// might return pre-existing?
ObjectHandle newFromInfoHash(ByteSpan info_hash);
std::unique_ptr<File2I> file2(Object o, FILE2_FLAGS flags); // default does nothing
std::unique_ptr<File2I> file2(Object o, FILE2_FLAGS flags) override;
};
} // Backends

View File

@ -1,10 +1,11 @@
#include "./chunk_picker.hpp"
#include <solanaceae/tox_contacts/components.hpp>
#include "./components.hpp"
#include "./contact_components.hpp"
#include <solanaceae/object_store/meta_components_file.hpp>
#include "./components.hpp"
#include <algorithm>
#include <iostream>
@ -140,37 +141,39 @@ void ChunkPicker::updateParticipation(
continue;
}
if (o.all_of<Message::Components::Transfer::TagPaused>()) {
if (o.all_of<ObjComp::Ephemeral::File::TagTransferPaused>()) {
participating_unfinished.erase(o);
continue;
}
if (o.get<Components::FT1ChunkSHA1Cache>().have_all) {
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
participating_unfinished.erase(o);
continue;
}
} else {
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
continue;
}
if (o.all_of<Message::Components::Transfer::TagPaused>()) {
if (o.all_of<ObjComp::Ephemeral::File::TagTransferPaused>()) {
continue;
}
if (!o.get<Components::FT1ChunkSHA1Cache>().have_all) {
using Priority = Components::DownloadPriority::Priority;
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
//using Priority = Components::DownloadPriority::Priority;
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
Priority prio = Priority::NORMAL;
if (o.all_of<Components::DownloadPriority>()) {
prio = o.get<Components::DownloadPriority>().p;
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
}
uint16_t pskips =
prio == Priority::HIGHER ? 0u :
prio == Priority::HIGHEST ? 0u :
prio == Priority::HIGH ? 1u :
prio == Priority::NORMAL ? 2u :
prio == Priority::LOW ? 4u :
8u
8u // LOWEST
;
participating_unfinished.emplace(o, ParticipationEntry{pskips});
@ -255,18 +258,19 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
ObjectHandle o {objreg, it->first};
// intersect self have with other have
if (!o.all_of<Components::RemoteHave, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
if (!o.all_of<Components::RemoteHaveBitset, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
// rare case where no one else has anything
continue;
}
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
if (cc.have_all) {
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n";
continue;
}
const auto& others_have = o.get<Components::RemoteHave>().others;
//const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
const auto& others_have = o.get<Components::RemoteHaveBitset>().others;
auto other_it = others_have.find(c);
if (other_it == others_have.end()) {
// rare case where the other is participating but has nothing
@ -275,7 +279,14 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
const auto& other_have = other_it->second;
BitSet chunk_candidates = cc.have_chunk;
const auto& info = o.get<Components::FT1InfoSHA1>();
const auto total_chunks = info.chunks.size();
const auto* lhb = o.try_get<ObjComp::F::LocalHaveBitset>();
// if we dont have anything, this might not exist yet
BitSet chunk_candidates = lhb == nullptr ? BitSet{total_chunks} : (lhb->have.size_bits() >= total_chunks ? lhb->have : BitSet{total_chunks});
if (!other_have.have_all) {
// AND is the same as ~(~A | ~B)
// that means we leave chunk_candidates as (have is inverted want)
@ -288,8 +299,6 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
} else {
chunk_candidates.invert();
}
const auto& info = o.get<Components::FT1InfoSHA1>();
const auto total_chunks = info.chunks.size();
auto& requested_chunks = o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
// TODO: trim off round up to 8, since they are now always set
@ -306,8 +315,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// TODO: configurable
size_t start_offset {0u};
if (o.all_of<Components::ReadHeadHint>()) {
const auto byte_offset = o.get<Components::ReadHeadHint>().offset_into_file;
if (o.all_of<ObjComp::Ephemeral::File::ReadHeadHint>()) {
const auto byte_offset = o.get<ObjComp::Ephemeral::File::ReadHeadHint>().offset_into_file;
if (byte_offset <= info.file_size) {
start_offset = byte_offset/info.chunk_size;
} else {
@ -318,7 +327,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
//PickerStrategyRandom ps(chunk_candidates, total_chunks, _rng);
PickerStrategyRandomSequential ps(chunk_candidates, total_chunks, _rng, start_offset);
size_t out_chunk_idx {0};
while (ps.gen(out_chunk_idx) && req_ret.size() < num_requests) {
size_t req_from_this_o {0};
while (ps.gen(out_chunk_idx) && req_ret.size() < num_requests && req_from_this_o < std::max<size_t>(total_chunks/3, 1)) {
// out_chunk_idx is a potential candidate we can request form peer
// - check against double requests
@ -346,6 +356,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// TODO: move this after packet was sent successfully
// (move net in? hmm)
requested_chunks[out_chunk_idx] = Components::FT1ChunkSHA1Requested::Entry{0.f, c};
req_from_this_o++;
}
}

View File

@ -1,5 +1,7 @@
#include "./components.hpp"
#include <solanaceae/object_store/meta_components_file.hpp>
namespace Components {
std::vector<size_t> FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
@ -11,14 +13,20 @@ std::vector<size_t> FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) cons
}
}
bool FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
if (have_all) { // short cut
bool FT1ChunkSHA1Cache::haveChunk(ObjectHandle o, const SHA1Digest& hash) const {
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
return true;
}
const auto* lhb = o.try_get<ObjComp::F::LocalHaveBitset>();
if (lhb == nullptr) {
return false; // we dont have anything yet
}
if (auto i_vec = chunkIndices(hash); !i_vec.empty()) {
// TODO: should i test all?
return have_chunk[i_vec.front()];
//return have_chunk[i_vec.front()];
return lhb->have[i_vec.front()];
}
// not part of this file

View File

@ -3,6 +3,7 @@
#include <solanaceae/contact/components.hpp>
#include <solanaceae/message3/components.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/object_store/meta_components_file.hpp>
#include <solanaceae/util/bitset.hpp>
@ -35,18 +36,24 @@ namespace Components {
};
struct FT1ChunkSHA1Cache {
// TODO: extract have_chunk, have_all and have_count to generic comp
// TODO: extract have_count to generic comp
// have_chunk is the size of info.chunks.size(), or empty if have_all
// keep in mind bitset rounds up to 8s
BitSet have_chunk{0};
//BitSet have_chunk{0};
bool have_all {false};
size_t have_count {0};
//bool have_all {false};
size_t have_count {0}; // move?
entt::dense_map<SHA1Digest, std::vector<size_t>> chunk_hash_to_index;
std::vector<size_t> chunkIndices(const SHA1Digest& hash) const;
bool haveChunk(const SHA1Digest& hash) const;
bool haveChunk(ObjectHandle o, const SHA1Digest& hash) const;
};
struct FT1File2 {
// the cached file2 for faster access
// should be destroyed when no activity and recreated on demand
std::unique_ptr<File2I> file;
};
struct FT1ChunkSHA1Requested {
@ -63,7 +70,7 @@ namespace Components {
entt::dense_set<Contact3> participants;
};
struct RemoteHave {
struct RemoteHaveBitset {
struct Entry {
bool have_all {false};
BitSet have;
@ -92,41 +99,8 @@ namespace Components {
void lower(void);
};
struct DownloadPriority {
// download/retreival priority in comparison to other objects
// not all backends implement this
// priority can be weak, meaning low priority dls will still get transfer activity, just less often
enum class Priority {
HIGHER,
HIGH,
NORMAL,
LOW,
LOWER,
} p = Priority::NORMAL;
};
struct ReadHeadHint {
// points to the first byte we want
// this is just a hint, that can be set from outside
// to guide the sequential "piece picker" strategy
// ? the strategy *should* set this to the first byte we dont yet have
uint64_t offset_into_file {0u};
};
// this is per object/content
// more aplicable than "separated", so should be supported by most backends
struct TransferStats {
// in bytes per second
float rate_up {0.f};
float rate_down {0.f};
// bytes
uint64_t total_up {0u};
uint64_t total_down {0u};
};
struct TransferStatsSeparated {
entt::dense_map<Contact3, TransferStats> stats;
entt::dense_map<Contact3, ObjComp::Ephemeral::File::TransferStats> stats;
};
// used to populate stats
@ -134,7 +108,7 @@ namespace Components {
struct Peer {
struct Entry {
float time_point {0.f};
size_t bytes {0u};
uint64_t bytes {0u};
bool accounted {false};
};
std::deque<Entry> recently_sent;

View File

@ -8,6 +8,7 @@
#include <fstream>
#include <iostream>
#include <cstring>
#include <cassert>
struct File2RWMapped : public File2I {
mio::ummap_sink _file_map;
@ -64,8 +65,14 @@ struct File2RWMapped : public File2I {
}
ByteSpanWithOwnership read(uint64_t size, int64_t pos = -1) override {
// TODO: support streaming read
if (pos < 0) {
assert(false && "streaming not implemented");
return ByteSpan{};
}
if (pos+size > _file_size) {
//assert(false && "read past end");
assert(false && "read past end");
return ByteSpan{};
}

View File

@ -1,5 +1,8 @@
#include "./ft1_sha1_info.hpp"
// next power of two
#include <entt/core/memory.hpp>
#include <sodium.h>
SHA1Digest::SHA1Digest(const std::vector<uint8_t>& v) {
@ -28,6 +31,27 @@ std::ostream& operator<<(std::ostream& out, const SHA1Digest& v) {
return out;
}
uint32_t chunkSizeFromFileSize(uint64_t file_size) {
const uint64_t fs_low {UINT64_C(512)*1024};
const uint64_t fs_high {UINT64_C(2)*1024*1024*1024};
const uint32_t cs_low {32*1024};
const uint32_t cs_high {4*1024*1024};
if (file_size <= fs_low) { // 512kib
return cs_low; // 32kib
} else if (file_size >= fs_high) { // 2gib
return cs_high; // 4mib
}
double t = file_size - fs_low;
t /= fs_high;
double x = (1 - t) * cs_low + t * cs_high;
return entt::next_power_of_two(uint64_t(x));
}
size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
if (chunk_index+1 == chunks.size()) {
// last chunk

View File

@ -25,21 +25,23 @@ std::ostream& operator<<(std::ostream& out, const SHA1Digest& v);
namespace std { // inject
template<> struct hash<SHA1Digest> {
std::size_t operator()(const SHA1Digest& h) const noexcept {
std::uint64_t operator()(const SHA1Digest& h) const noexcept {
return
size_t(h.data[0]) << (0*8) |
size_t(h.data[1]) << (1*8) |
size_t(h.data[2]) << (2*8) |
size_t(h.data[3]) << (3*8) |
size_t(h.data[4]) << (4*8) |
size_t(h.data[5]) << (5*8) |
size_t(h.data[6]) << (6*8) |
size_t(h.data[7]) << (7*8)
std::uint64_t(h.data[0]) << (0*8) |
std::uint64_t(h.data[1]) << (1*8) |
std::uint64_t(h.data[2]) << (2*8) |
std::uint64_t(h.data[3]) << (3*8) |
std::uint64_t(h.data[4]) << (4*8) |
std::uint64_t(h.data[5]) << (5*8) |
std::uint64_t(h.data[6]) << (6*8) |
std::uint64_t(h.data[7]) << (7*8)
;
}
};
} // std
uint32_t chunkSizeFromFileSize(uint64_t file_size);
struct FT1InfoSHA1 {
std::string file_name;
uint64_t file_size {0};

View File

@ -1,7 +1,7 @@
#include "./re_announce_systems.hpp"
#include "./components.hpp"
#include <solanaceae/message3/components.hpp>
#include <solanaceae/object_store/meta_components_file.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
#include <vector>
@ -18,11 +18,12 @@ void re_announce(
std::vector<Object> to_remove;
os_reg.view<Components::ReAnnounceTimer>().each([&os_reg, &cr, &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) {
ObjectHandle o{os_reg, ov};
// if paused -> remove
if (o.all_of<Message::Components::Transfer::TagPaused>()) {
to_remove.push_back(ov);
return;
}
// TODO: pause
//// if paused -> remove
//if (o.all_of<Message::Components::Transfer::TagPaused>()) {
// to_remove.push_back(ov);
// return;
//}
// if not downloading or info incomplete -> remove
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
@ -31,7 +32,7 @@ void re_announce(
return;
}
if (o.get<Components::FT1ChunkSHA1Cache>().have_all) {
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
// transfer done, we stop announcing
to_remove.push_back(ov);
return;

View File

@ -11,7 +11,8 @@ void SendingTransfers::tick(float delta) {
// if we have not heard for 10min, timeout (lower level event on real timeout)
// (2min was too little, so it seems)
// TODO: do we really need this if we get events?
if (it->second.time_since_activity >= 60.f*10.f) {
// FIXME: disabled for now, we are trusting ngcft1 for now
if (false && it->second.time_since_activity >= 60.f*10.f) {
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
assert(false);
it = peer_it->second.erase(it);

View File

@ -5,7 +5,8 @@
#include <solanaceae/contact/components.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/message3/components.hpp>
#include <solanaceae/tox_messages/components.hpp>
#include <solanaceae/tox_messages/msg_components.hpp>
#include <solanaceae/object_store/meta_components_file.hpp>
#include "./util.hpp"
@ -31,12 +32,6 @@
#include <filesystem>
#include <vector>
namespace Message::Components {
using Content = ObjectHandle;
} // Message::Components
static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
if (chunk_index+1 == sha1_info.chunks.size()) {
// last chunk
@ -70,30 +65,15 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, obj, hash, 0.f));
}
void SHA1_NGCFT1::updateMessages(ObjectHandle ce) {
assert(ce.all_of<Components::Messages>());
void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
assert(o.all_of<Components::Messages>());
for (auto msg : ce.get<Components::Messages>().messages) {
if (ce.all_of<Message::Components::Transfer::FileInfo>() && !msg.all_of<Message::Components::Transfer::FileInfo>()) {
msg.emplace<Message::Components::Transfer::FileInfo>(ce.get<Message::Components::Transfer::FileInfo>());
}
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) {
msg.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(ce.get<Message::Components::Transfer::FileInfoLocal>());
}
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
msg.emplace_or_replace<Message::Components::Transfer::BytesSent>(ce.get<Message::Components::Transfer::BytesSent>());
}
if (ce.all_of<Message::Components::Transfer::BytesReceived>()) {
msg.emplace_or_replace<Message::Components::Transfer::BytesReceived>(ce.get<Message::Components::Transfer::BytesReceived>());
}
if (ce.all_of<Message::Components::Transfer::TagPaused>()) {
msg.emplace_or_replace<Message::Components::Transfer::TagPaused>();
} else {
msg.remove<Message::Components::Transfer::TagPaused>();
}
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
msg.emplace_or_replace<Message::Components::Transfer::TagHaveAll>();
}
for (auto msg : o.get<Components::Messages>().messages) {
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
// messages no long hold this info
// this should not update messages anymore but simply just update the object
// and receivers should listen for object updates (?)
_rmm.throwEventUpdate(msg);
}
@ -177,46 +157,93 @@ void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) {
_queue_send_bitset.push_back(QBitsetEntry{c, o});
}
File2I* SHA1_NGCFT1::objGetFile2Write(ObjectHandle o) {
auto* file2_comp_ptr = o.try_get<Components::FT1File2>();
if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_write || !file2_comp_ptr->file->isGood()) {
// (re)request file2 from backend
auto new_file = _mfb.file2(o, StorageBackendI::FILE2_WRITE);
if (!new_file || !new_file->can_write || !new_file->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed to open object for writing\n";
return nullptr; // early out
}
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file));
}
assert(file2_comp_ptr != nullptr);
assert(static_cast<bool>(file2_comp_ptr->file));
return file2_comp_ptr->file.get();
}
File2I* SHA1_NGCFT1::objGetFile2Read(ObjectHandle o) {
auto* file2_comp_ptr = o.try_get<Components::FT1File2>();
if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_read || !file2_comp_ptr->file->isGood()) {
// (re)request file2 from backend
auto new_file = _mfb.file2(o, StorageBackendI::FILE2_READ);
if (!new_file || !new_file->can_read || !new_file->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed to open object for reading\n";
return nullptr; // early out
}
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file));
}
assert(file2_comp_ptr != nullptr);
assert(static_cast<bool>(file2_comp_ptr->file));
return file2_comp_ptr->file.get();
}
SHA1_NGCFT1::SHA1_NGCFT1(
ObjectStore2& os,
Contact3Registry& cr,
RegistryMessageModel& rmm,
RegistryMessageModelI& rmm,
NGCFT1& nft,
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCEXTEventProvider& neep
) :
_os(os),
_os_sr(_os.newSubRef(this)),
_cr(cr),
_rmm(rmm),
_rmm_sr(_rmm.newSubRef(this)),
_nft(nft),
_nft_sr(_nft.newSubRef(this)),
_tcm(tcm),
_tep(tep),
_tep_sr(_tep.newSubRef(this)),
_neep(neep),
_neep_sr(_neep.newSubRef(this)),
_mfb(os)
{
_os_sr
// TODO: also create and destroy
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_construct);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
// .subscribe(ObjectStore_Event::object_construct)
.subscribe(ObjectStore_Event::object_update)
// .subscribe(ObjectStore_Event::object_destroy)
;
_nft.subscribe(this, NGCFT1_Event::recv_request);
_nft.subscribe(this, NGCFT1_Event::recv_init);
_nft.subscribe(this, NGCFT1_Event::recv_data);
_nft.subscribe(this, NGCFT1_Event::send_data);
_nft.subscribe(this, NGCFT1_Event::recv_done);
_nft.subscribe(this, NGCFT1_Event::send_done);
_nft.subscribe(this, NGCFT1_Event::recv_message);
_nft_sr
.subscribe(NGCFT1_Event::recv_request)
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_data)
.subscribe(NGCFT1_Event::send_data)
.subscribe(NGCFT1_Event::recv_done)
.subscribe(NGCFT1_Event::send_done)
.subscribe(NGCFT1_Event::recv_message)
;
_rmm.subscribe(this, RegistryMessageModel_Event::send_file_path);
_rmm_sr.subscribe(RegistryMessageModel_Event::send_file_path);
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_JOIN);
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT);
_tep_sr
.subscribe(Tox_Event_Type::TOX_EVENT_GROUP_PEER_JOIN)
.subscribe(Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT)
;
_neep.subscribe(this, NGCEXT_Event::FT1_HAVE);
_neep.subscribe(this, NGCEXT_Event::FT1_BITSET);
_neep.subscribe(this, NGCEXT_Event::FT1_HAVE_ALL);
_neep.subscribe(this, NGCEXT_Event::PC1_ANNOUNCE);
_neep_sr
.subscribe(NGCEXT_Event::FT1_HAVE)
.subscribe(NGCEXT_Event::FT1_BITSET)
.subscribe(NGCEXT_Event::FT1_HAVE_ALL)
.subscribe(NGCEXT_Event::PC1_ANNOUNCE)
;
}
float SHA1_NGCFT1::iterate(float delta) {
@ -247,18 +274,27 @@ float SHA1_NGCFT1::iterate(float delta) {
{ // requested info timers
std::vector<Object> timed_out;
_os.registry().view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Object e, Components::ReRequestInfoTimer& rrit) {
_os.registry().view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Object ov, Components::ReRequestInfoTimer& rrit) {
rrit.timer += delta;
// 15sec, TODO: config
if (rrit.timer >= 15.f) {
timed_out.push_back(e);
timed_out.push_back(ov);
}
});
for (const auto e : timed_out) {
// TODO: avoid dups
_queue_content_want_info.push_back(_os.objectHandle(e));
_os.registry().remove<Components::ReRequestInfoTimer>(e);
for (const ObjectHandle it : _queue_content_want_info) {
assert(it != e);
}
auto o = _os.objectHandle(e);
assert(!o.any_of<ObjComp::F::SingleInfo>());
assert(!o.any_of<ObjComp::F::TagLocalHaveAll>());
_queue_content_want_info.push_back(o);
//_os.registry().remove<Components::ReRequestInfoTimer>(e);
o.remove<Components::ReRequestInfoTimer>();
// TODO: throw update?
}
}
@ -289,20 +325,19 @@ float SHA1_NGCFT1::iterate(float delta) {
if (static_cast<bool>(qe.o) && static_cast<bool>(qe.c) && qe.c.all_of<Contact::Components::ToxGroupPeerEphemeral>() && qe.o.all_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Hash, Components::FT1ChunkSHA1Cache>()) {
const auto [group_number, peer_number] = qe.c.get<Contact::Components::ToxGroupPeerEphemeral>();
const auto& info_hash = qe.o.get<Components::FT1InfoSHA1Hash>().hash;
const auto& cc = qe.o.get<Components::FT1ChunkSHA1Cache>();
const auto& info = qe.o.get<Components::FT1InfoSHA1>();
const auto total_chunks = info.chunks.size();
static constexpr size_t bits_per_packet {8u*512u};
if (cc.have_all) {
if (qe.o.all_of<ObjComp::F::TagLocalHaveAll>()) {
// send have all
_neep.send_ft1_have_all(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO),
info_hash.data(), info_hash.size()
);
} else {
} else if (const auto* lhb = qe.o.try_get<ObjComp::F::LocalHaveBitset>(); lhb != nullptr) {
for (size_t i = 0; i < total_chunks; i += bits_per_packet) {
size_t bits_this_packet = std::min<size_t>(bits_per_packet, total_chunks-i);
@ -310,7 +345,7 @@ float SHA1_NGCFT1::iterate(float delta) {
// TODO: optimize selective copy bitset
for (size_t j = i; j < i+bits_this_packet; j++) {
if (cc.have_chunk[j]) {
if (lhb->have[j]) {
have.set(j-i);
}
}
@ -324,7 +359,7 @@ float SHA1_NGCFT1::iterate(float delta) {
have._bytes.data(), have.size_bytes()
);
}
}
} // else, we have nothing *shrug*
}
_queue_send_bitset.pop_front();
@ -466,6 +501,9 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
}
}
// in both cases, private and public, c (contact to) is the target
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c);
// create message
const auto c_self = _cr.get<Contact::Components::Self>(c).self;
if (!_cr.valid(c_self)) {
@ -479,8 +517,9 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
reg_ptr->emplace<Message::Components::MessageFileObject>(msg_e, o);
//reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
o.get_or_emplace<Components::Messages>().messages.push_back({*reg_ptr, msg_e});
@ -510,29 +549,27 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
_rmm.throwEventConstruct(*reg_ptr, msg_e);
// TODO: place in iterate?
updateMessages(o);
updateMessages(o); // nop // TODO: remove
}
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// see tox_transfer_manager.cpp for reference
if (!e.e.all_of<Message::Components::Transfer::ActionAccept, Message::Components::Content>()) {
bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
if (!e.e.all_of<ObjComp::Ephemeral::File::ActionTransferAccept>()) {
return false;
}
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
auto ce = e.e.get<Message::Components::Content>();
//if (!ce.all_of<Components::FT1InfoSHA1, Components::FT1ChunkSHA1Cache>()) {
if (!ce.all_of<Components::FT1InfoSHA1>()) {
if (!e.e.all_of<Components::FT1InfoSHA1>()) {
// not ready to load yet, skip
return false;
}
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
assert(!ce.all_of<Message::Components::Transfer::File>());
assert(!e.e.all_of<ObjComp::F::TagLocalHaveAll>());
assert(!e.e.all_of<Components::FT1ChunkSHA1Cache>());
assert(!e.e.all_of<Components::FT1File2>());
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
// first, open file for write(+readback)
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path};
std::string full_file_path{e.e.get<ObjComp::Ephemeral::File::ActionTransferAccept>().save_to_path};
// TODO: replace with filesystem or something
// TODO: use bool in action !!!
if (full_file_path.back() != '/') {
full_file_path += "/";
}
@ -540,10 +577,10 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// ensure dir exists
std::filesystem::create_directories(full_file_path);
const auto& info = ce.get<Components::FT1InfoSHA1>();
const auto& info = e.e.get<Components::FT1InfoSHA1>();
full_file_path += info.file_name;
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path});
e.e.emplace<ObjComp::F::SingleInfoLocal>(full_file_path);
const bool file_exists = std::filesystem::exists(full_file_path);
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(full_file_path, info.file_size);
@ -551,15 +588,17 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
// we failed opening that filepath, so we should offer the user the oportunity to save it differently
e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
e.e.remove<ObjComp::Ephemeral::File::ActionTransferAccept>(); // stop
return false;
}
{ // next, create chuck cache and check for existing data
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
auto& bytes_received = ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total;
cc.have_chunk = BitSet(info.chunks.size());
cc.have_all = false;
auto& transfer_stats = e.e.get_or_emplace<ObjComp::Ephemeral::File::TransferStats>();
auto& lhb = e.e.get_or_emplace<ObjComp::F::LocalHaveBitset>();
if (lhb.have.size_bits() < info.chunks.size()) {
lhb.have = BitSet{info.chunks.size()};
}
auto& cc = e.e.emplace<Components::FT1ChunkSHA1Cache>();
cc.have_count = 0;
cc.chunk_hash_to_index.clear(); // if copy pasta
@ -576,9 +615,12 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
const bool data_equal = data_hash == info.chunks.at(i);
if (data_equal) {
cc.have_chunk.set(i);
lhb.have.set(i);
cc.have_count += 1;
bytes_received += chunk_size;
// TODO: replace with some progress counter?
// or move have_count/want_count or something?
transfer_stats.total_down += chunk_size;
//std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n";
} else {
//std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n";
@ -587,59 +629,45 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// error reading?
}
_chunks[info.chunks[i]] = ce;
_chunks[info.chunks[i]] = e.e;
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
}
std::cout << "preexisting " << cc.have_count << "/" << info.chunks.size() << "\n";
if (cc.have_count >= info.chunks.size()) {
cc.have_all = true;
//ce.remove<Message::Components::Transfer::BytesReceived>();
e.e.emplace_or_replace<ObjComp::F::TagLocalHaveAll>();
e.e.remove<ObjComp::F::LocalHaveBitset>();
}
} else {
for (size_t i = 0; i < info.chunks.size(); i++) {
_chunks[info.chunks[i]] = ce;
_chunks[info.chunks[i]] = e.e;
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
}
}
}
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
e.e.emplace_or_replace<Components::FT1File2>(std::move(file_impl));
// queue announce we are participating
// since this is the first time, we publicly announce to all
if (e.e.all_of<Message::Components::ContactFrom, Message::Components::ContactTo>()) {
const auto c_f = e.e.get<Message::Components::ContactFrom>().c;
const auto c_t = e.e.get<Message::Components::ContactTo>().c;
// queue announce that we are participating
e.e.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
if (_cr.all_of<Contact::Components::ToxGroupEphemeral>(c_t)) {
// public
ce.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c_t);
} else if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c_f)) {
// private ?
ce.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c_f);
}
}
ce.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
ce.remove<Message::Components::Transfer::TagPaused>();
e.e.remove<ObjComp::Ephemeral::File::TagTransferPaused>();
// start requesting from all participants
if (ce.all_of<Components::SuspectedParticipants>()) {
std::cout << "accepted ft has " << ce.get<Components::SuspectedParticipants>().participants.size() << " sp\n";
for (const auto cv : ce.get<Components::SuspectedParticipants>().participants) {
if (e.e.all_of<Components::SuspectedParticipants>()) {
std::cout << "accepted ft has " << e.e.get<Components::SuspectedParticipants>().participants.size() << " sp\n";
for (const auto cv : e.e.get<Components::SuspectedParticipants>().participants) {
_cr.emplace_or_replace<ChunkPickerUpdateTag>(cv);
}
} else {
std::cout << "accepted ft has NO sp!\n";
}
// should?
e.e.remove<Message::Components::Transfer::ActionAccept>();
e.e.remove<ObjComp::Ephemeral::File::ActionTransferAccept>();
updateMessages(ce);
updateMessages(e.e);
return false;
return false; // ?
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
@ -672,21 +700,21 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
// TODO: queue instead
//queueUpRequestInfo(e.group_number, e.peer_number, info_hash);
uint8_t transfer_id {0};
_nft.NGC_FT1_send_init_private(
if (_nft.NGC_FT1_send_init_private(
e.group_number, e.peer_number,
static_cast<uint32_t>(e.file_kind),
e.file_id, e.file_id_size,
o.get<Components::FT1InfoSHA1Data>().data.size(),
&transfer_id
);
_sending_transfers.emplaceInfo(
e.group_number, e.peer_number,
transfer_id,
SendingTransfers::Entry::Info{
o.get<Components::FT1InfoSHA1Data>().data
}
);
)) {
_sending_transfers.emplaceInfo(
e.group_number, e.peer_number,
transfer_id,
SendingTransfers::Entry::Info{
o.get<Components::FT1InfoSHA1Data>().data
}
);
}
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
@ -717,7 +745,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
assert(o.all_of<Components::FT1ChunkSHA1Cache>());
if (!o.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
if (!o.get<Components::FT1ChunkSHA1Cache>().haveChunk(o, chunk_hash)) {
// we dont have the chunk
return false;
}
@ -793,7 +821,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
assert(o.all_of<Components::FT1ChunkSHA1Cache>());
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
if (cc.haveChunk(sha1_chunk_hash)) {
if (cc.haveChunk(o, sha1_chunk_hash)) {
std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n";
// we have the chunk
return false;
@ -840,6 +868,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
std::cerr << "SHA1_NGCFT1 waring: unknown transfer " << e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
return false;
}
@ -847,6 +876,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
transfer.time_since_activity = 0.f;
if (transfer.isInfo()) {
std::cout << "SHA1_NGCFT1: got info data " << e.data_size << "@" << e.data_offset << " from " << e.group_number << ":" << e.peer_number << "\n";
auto& info_data = transfer.getInfo().info_data;
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
info_data[i+e.data_offset] = e.data[i];
@ -854,16 +884,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
} else if (transfer.isChunk()) {
auto o = transfer.getChunk().content;
assert(o.all_of<Message::Components::Transfer::File>());
auto* file = o.get<Message::Components::Transfer::File>().get();
assert(file != nullptr);
const auto chunk_size = o.get<Components::FT1InfoSHA1>().chunk_size;
for (const auto chunk_index : transfer.getChunk().chunk_indices) {
const auto offset_into_file = chunk_index * chunk_size;
if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
auto* file2 = objGetFile2Write(o);
if (file2 == nullptr) {
std::cerr << "SHA1_NGCFT1 error: writing file failed, no file object\n";
return false; // early out
}
if (!file2->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << entt::to_integral(o.entity()) << "@" << offset_into_file + e.data_offset << "\n";
}
}
@ -889,6 +920,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
return false;
}
@ -903,8 +935,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
} else if (transfer.isChunk()) {
auto& chunk_transfer = transfer.getChunk();
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
// TODO: should we really use file?
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(
auto* file2 = objGetFile2Read(chunk_transfer.content);
if (file2 == nullptr) {
// return true?
return false; // early out
}
const auto data = file2->read(
e.data_size,
(chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset
);
@ -914,7 +952,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
e.data[i] = data[i];
}
chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size;
// TODO: add event to propergate to messages
//_rmm.throwEventUpdate(transfer); // should we?
@ -972,9 +1009,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
{ // file info
// TODO: not overwrite fi? since same?
auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
file_info.total_size = ft_info.file_size;
auto& file_info = o.emplace_or_replace<ObjComp::F::SingleInfo>(ft_info.file_name, ft_info.file_size);
//auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>();
//file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
//file_info.total_size = ft_info.file_size;
}
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
@ -984,7 +1022,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
_queue_content_want_info.erase(it);
}
o.emplace_or_replace<Message::Components::Transfer::TagPaused>();
o.emplace_or_replace<ObjComp::Ephemeral::File::TagTransferPaused>();
updateMessages(o);
} else if (transfer.isChunk()) {
@ -1000,7 +1038,12 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
const auto chunk_size = info.chunkSize(chunk_index);
assert(offset_into_file+chunk_size <= info.file_size);
const auto chunk_data = o.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file);
auto* file2 = objGetFile2Read(o);
if (file2 == nullptr) {
// rip
return false;
}
auto chunk_data = std::move(file2->read(chunk_size, offset_into_file));
assert(!chunk_data.empty());
// check hash of chunk
@ -1008,35 +1051,43 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
if (info.chunks.at(chunk_index) == got_hash) {
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
if (!cc.have_all) {
for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) {
if (!cc.have_all && !cc.have_chunk[inner_chunk_index]) {
cc.have_chunk.set(inner_chunk_index);
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
{
auto& lhb = o.get_or_emplace<ObjComp::F::LocalHaveBitset>(BitSet{info.chunks.size()});
for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) {
if (lhb.have[inner_chunk_index]) {
continue;
}
// new good chunk
lhb.have.set(inner_chunk_index);
cc.have_count += 1;
// TODO: have wasted + metadata
//o.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
// we already tallied all of them but maybe we want to set some other progress indicator here?
if (cc.have_count == info.chunks.size()) {
// debug check
for ([[maybe_unused]] size_t i = 0; i < info.chunks.size(); i++) {
assert(cc.have_chunk[i]);
assert(lhb.have[i]);
}
cc.have_all = true;
cc.have_chunk = BitSet(0); // not wasting memory
o.emplace_or_replace<ObjComp::F::TagLocalHaveAll>();
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
// HACK: remap file, to clear ram
// TODO: error checking
o.get<Message::Components::Transfer::File>() = construct_file2_rw_mapped(
o.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
info.file_size
);
// HACK: close file2, to clear ram
// TODO: just add a lastActivity comp and close files every x minutes based on that
file2 = nullptr; // making sure we dont have a stale ptr
o.remove<Components::FT1File2>(); // will be recreated on demand
break;
}
// good chunk
// TODO: have wasted + metadata
o.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
}
}
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
o.remove<ObjComp::F::LocalHaveBitset>(); // save space
}
// queue chunk have for all participants
// HACK: send immediatly to all participants
@ -1062,28 +1113,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
chunk_indices.data(), chunk_indices.size()
);
}
#if 0
if (!cc.have_all) { // debug print self have set
std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits();
for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) {
if (i % 32 == 0) {
printf("\n");
}
// f cout
printf("%.2x", (uint16_t)cc.have_chunk.data()[i]);
}
printf("\n");
}
#endif
} else {
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
}
// something happend, update chunk picker
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
//assert(static_cast<bool>(c));
// happened, went offline but chunk was still done o.o
if (static_cast<bool>(c)) {
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
} else {
// bad chunk
std::cout << "SHA1_NGCFT1: got BAD chunk from " << e.group_number << ":" << e.peer_number << " [" << info.chunks.at(chunk_index) << "] ; instead got [" << SHA1Digest{got_hash} << "]\n";
@ -1125,7 +1165,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
return false;
}
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t ts = Message::getTimeMS();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
@ -1152,7 +1192,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
reg.emplace<Message::Components::ToxGroupMessageID>(new_msg_e, e.message_id);
reg.emplace<Message::Components::Transfer::TagReceiving>(new_msg_e); // add sending?
//reg.emplace<Message::Components::Transfer::TagReceiving>(new_msg_e); // add sending?
reg.emplace<Message::Components::TimestampProcessed>(new_msg_e, ts);
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
@ -1174,59 +1214,58 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
// check if content exists
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
ObjectHandle ce;
ObjectHandle o;
if (_info_to_content.count(sha1_info_hash)) {
ce = _info_to_content.at(sha1_info_hash);
o = _info_to_content.at(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has existing content\n";
} else {
// TODO: backend
ce = {_os.registry(), _os.registry().create()};
_info_to_content[sha1_info_hash] = ce;
o = _mfb.newObject(ByteSpan{sha1_info_hash});
_info_to_content[sha1_info_hash] = o;
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has new content\n";
//ce.emplace<Components::FT1InfoSHA1>(sha1_info);
//ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
ce.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
}
ce.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
reg_ptr->emplace<Message::Components::Content>(new_msg_e, ce);
o.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
reg_ptr->emplace<Message::Components::MessageFileObject>(new_msg_e, o);
// HACK: assume the message sender is participating. usually a safe bet.
if (addParticipation(c, ce)) {
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
// HACK: assume the message sender has all
ce.get_or_emplace<Components::RemoteHave>().others[c] = {true, {}};
if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) {
// TODO: check if already receiving
_queue_content_want_info.push_back(ce);
}
o.get_or_emplace<Components::RemoteHaveBitset>().others[c] = {true, {}};
// TODO: queue info dl
//reg_ptr->emplace<Components::FT1InfoSHA1>(e, sha1_info);
//reg_ptr->emplace<Components::FT1InfoSHA1Data>(e, sha1_info_data); // keep around? or file?
//reg.emplace<Components::FT1InfoSHA1Hash>(new_msg_e, std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size});
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(new_msg_e);
}
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(new_msg_e, ce.get<Message::Components::Transfer::FileInfo>());
}
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfoLocal>(new_msg_e, ce.get<Message::Components::Transfer::FileInfoLocal>());
}
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
reg_ptr->emplace<Message::Components::Transfer::BytesSent>(new_msg_e, ce.get<Message::Components::Transfer::BytesSent>());
}
// TODO: queue info/check if we already have info
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
bool in_info_want {false};
for (const auto it : _queue_content_want_info) {
if (it == o) {
in_info_want = true;
break;
}
}
if (!in_info_want) {
// TODO: check if already receiving
_queue_content_want_info.push_back(o);
}
} else if (o.all_of<Components::FT1InfoSHA1>()){
// remove from info want
o.remove<Components::ReRequestInfoTimer>();
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
if (it != _queue_content_want_info.cend()) {
_queue_content_want_info.erase(it);
}
}
// since public
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
_os.throwEventUpdate(o);
_rmm.throwEventConstruct(reg, new_msg_e);
@ -1249,7 +1288,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
}
// get current time unix epoch utc
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t ts = Message::getTimeMS();
_mfb.newFromFile(
file_name, file_path,
@ -1301,8 +1340,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
for (const auto& [_, o] : _info_to_content) {
removeParticipation(c, o);
if (o.all_of<Components::RemoteHave>()) {
o.get<Components::RemoteHave>().others.erase(c);
if (o.all_of<Components::RemoteHaveBitset>()) {
o.get<Components::RemoteHaveBitset>().others.erase(c);
}
}
}
@ -1353,51 +1392,59 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
// we might not know yet
if (addParticipation(c, o)) {
// something happend, update chunk picker
c.emplace_or_replace<ChunkPickerUpdateTag>();
//c.emplace_or_replace<ChunkPickerUpdateTag>();
}
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others;
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) {
// init
remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks});
remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
// new have? nice
// (always update on biset, not always on have)
c.emplace_or_replace<ChunkPickerUpdateTag>();
//c.emplace_or_replace<ChunkPickerUpdateTag>();
}
auto& remote_have_peer = remote_have.at(c);
if (!remote_have_peer.have_all) {
assert(remote_have_peer.have.size_bits() >= num_total_chunks);
if (remote_have_peer.have_all) {
return true; // peer somehow already had all, ignoring
}
for (const auto c_i : e.chunks) {
if (c_i >= num_total_chunks) {
std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n";
std::cerr << info_hash << ": " << c_i << " >= " << num_total_chunks << "\n";
continue;
}
assert(remote_have_peer.have.size_bits() >= num_total_chunks);
assert(c_i < num_total_chunks);
remote_have_peer.have.set(c_i);
bool a_valid_change {false};
for (const auto c_i : e.chunks) {
if (c_i >= num_total_chunks) {
std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n";
std::cerr << info_hash << ": " << c_i << " >= " << num_total_chunks << "\n";
continue;
}
// check for completion?
// TODO: optimize
bool test_all {true};
for (size_t i = 0; i < remote_have_peer.have.size_bits(); i++) {
if (!remote_have_peer.have[i]) {
test_all = false;
break;
}
}
assert(c_i < num_total_chunks);
remote_have_peer.have.set(c_i);
a_valid_change = true;
}
if (test_all) {
// optimize
remote_have_peer.have_all = true;
remote_have_peer.have = BitSet{};
if (a_valid_change) {
// new have? nice
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
// check for completion?
// TODO: optimize
bool test_all {true};
for (size_t i = 0; i < remote_have_peer.have.size_bits(); i++) {
if (!remote_have_peer.have[i]) {
test_all = false;
break;
}
}
if (test_all) {
// optimize
remote_have_peer.have_all = true;
remote_have_peer.have = BitSet{};
}
return true;
}
@ -1443,10 +1490,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
// we might not know yet
addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others;
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) {
// init
remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks});
remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
}
auto& remote_have_peer = remote_have.at(c);
@ -1472,7 +1519,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
}
// new have? nice
// (always update on bitset, not always on have)
c.emplace_or_replace<ChunkPickerUpdateTag>();
return true;
@ -1507,11 +1553,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) {
// we might not know yet
addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others;
remote_have[c] = Components::RemoteHave::Entry{true, {}};
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
remote_have[c] = Components::RemoteHaveBitset::Entry{true, {}};
// new have? nice
// (always update on have_all, not always on have)
c.emplace_or_replace<ChunkPickerUpdateTag>();
return true;

View File

@ -20,15 +20,20 @@
#include <random>
#include <chrono>
class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public NGCFT1EventI, public NGCEXTEventI {
class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public ObjectStoreEventI, public NGCFT1EventI, public NGCEXTEventI {
ObjectStore2& _os;
ObjectStore2::SubscriptionReference _os_sr;
// TODO: backend abstraction
Contact3Registry& _cr;
RegistryMessageModel& _rmm;
RegistryMessageModelI& _rmm;
RegistryMessageModelI::SubscriptionReference _rmm_sr;
NGCFT1& _nft;
NGCFT1::SubscriptionReference _nft_sr;
ToxContactModel2& _tcm;
ToxEventProviderI& _tep;
ToxEventProviderI::SubscriptionReference _tep_sr;
NGCEXTEventProvider& _neep;
NGCEXTEventProvider::SubscriptionReference _neep_sr;
Backends::SHA1MappedFilesystem _mfb;
@ -75,6 +80,9 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
void queueBitsetSendFull(Contact3Handle c, ObjectHandle o);
File2I* objGetFile2Write(ObjectHandle o);
File2I* objGetFile2Read(ObjectHandle o);
public: // TODO: config
bool _udp_only {false};
@ -85,7 +93,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
SHA1_NGCFT1(
ObjectStore2& os,
Contact3Registry& cr,
RegistryMessageModel& rmm,
RegistryMessageModelI& rmm,
NGCFT1& nft,
ToxContactModel2& tcm,
ToxEventProviderI& tep,
@ -97,7 +105,10 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts);
protected: // rmm events (actions)
bool onEvent(const Message::Events::MessageUpdated&) override;
bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;
protected: // os events (actions)
bool onEvent(const ObjectStore::Events::ObjectUpdate&) override;
protected: // events
bool onEvent(const Events::NGCFT1_recv_request&) override;
@ -108,8 +119,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
bool onEvent(const Events::NGCFT1_send_done&) override;
bool onEvent(const Events::NGCFT1_recv_message&) override;
bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;

View File

@ -1,6 +1,8 @@
#include "./transfer_stats_systems.hpp"
#include "./components.hpp"
#include <solanaceae/object_store/meta_components_file.hpp>
#include <iostream>
namespace Systems {
@ -88,7 +90,7 @@ void transfer_tally_update(ObjectRegistry& os_reg, const float time_now) {
// for each stats separated -> stats (total)
os_reg.view<Components::TransferStatsSeparated, Components::TransferStatsTally>().each([&os_reg](const auto ov, Components::TransferStatsSeparated& tss_comp, const auto&) {
Components::TransferStats& stats = os_reg.get_or_emplace<Components::TransferStats>(ov);
auto& stats = os_reg.get_or_emplace<ObjComp::Ephemeral::File::TransferStats>(ov);
stats = {}; // reset
for (const auto& [_, peer_stats] : tss_comp.stats) {

View File

@ -0,0 +1,91 @@
#include "./ngc_hs2.hpp"
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
NGCHS2::NGCHS2(
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCFT1& nft
) :
_tcm(tcm),
_tep_sr(tep.newSubRef(this)),
_nft(nft),
_nftep_sr(_nft.newSubRef(this))
{
_tep_sr
.subscribe(TOX_EVENT_GROUP_PEER_JOIN)
.subscribe(TOX_EVENT_GROUP_PEER_EXIT)
;
_nftep_sr
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_request)
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_data)
.subscribe(NGCFT1_Event::send_data)
.subscribe(NGCFT1_Event::recv_done)
.subscribe(NGCFT1_Event::send_done)
;
}
NGCHS2::~NGCHS2(void) {
}
float NGCHS2::iterate(float delta) {
return 1000.f;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_request& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
) {
return false; // not for us
}
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_init& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
) {
return false; // not for us
}
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_data&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_send_data&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_done&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_send_done&) {
return false;
}
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
const auto group_number = tox_event_group_peer_join_get_group_number(e);
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
assert(c);
// add to check list with inital cooldown
return false;
}
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
return false;
}

View File

@ -0,0 +1,44 @@
#pragma once
//#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/toxcore/tox_event_interface.hpp>
//#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
// fwd
class ToxContactModel2;
class NGCHS2 : public ToxEventI, public NGCFT1EventI {
ToxContactModel2& _tcm;
//Contact3Registry& _cr;
//RegistryMessageModelI& _rmm;
ToxEventProviderI::SubscriptionReference _tep_sr;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
public:
NGCHS2(
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCFT1& nf
);
~NGCHS2(void);
float iterate(float delta);
protected:
bool onEvent(const Events::NGCFT1_recv_request&) override;
bool onEvent(const Events::NGCFT1_recv_init&) override;
bool onEvent(const Events::NGCFT1_recv_data&) override;
bool onEvent(const Events::NGCFT1_send_data&) override;
bool onEvent(const Events::NGCFT1_recv_done&) override;
bool onEvent(const Events::NGCFT1_send_done&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
};