17 Commits

Author SHA1 Message Date
33560f8f8a receiving transfers refactor 2024-06-30 14:03:06 +02:00
3286a7228c more minor refactoring 2024-06-28 22:18:11 +02:00
b53e291c68 wip chunk picker (still unused) and a small refactor 2024-06-28 15:13:17 +02:00
27cade4dfe track remote have and bitset 2024-06-25 21:09:46 +02:00
0b4041db7e move bitset to util 2024-06-25 12:45:28 +02:00
e9e38db1d5 move self have_chunk to bitset 2024-06-25 12:08:17 +02:00
c8619561ec refactor: move (object/content) components out 2024-06-24 16:42:23 +02:00
1b630bc07f impl and test bitset util 2024-06-24 12:14:51 +02:00
ee2411b8e0 hack: send ft1_have every chunk we receive
produces unnecessary overhead, should be bundled
2024-06-23 15:12:31 +02:00
bc7417c1cd add send fn for new packets (parse and send still untested) 2024-06-23 12:55:23 +02:00
3827733f08 and remove the old code 2024-06-23 12:31:01 +02:00
5400c13f88 copy the remaining implemented send funcions over 2024-06-23 12:14:02 +02:00
8972386971 send out pc1 announces for ft infohash
will eliminate the guesswork in the future
2024-06-23 10:17:48 +02:00
b27107af4c start moving pkg sending to ngcext
wip, but working as far as its implemented
2024-06-23 10:14:03 +02:00
bcde244a3c handle pc1 announce and reduce chance to sample random peer
(will remove random sample sometime in the future)
2024-06-22 17:01:52 +02:00
e9f22bc9ae make ft1sha1 observe disconnects 2024-06-22 14:08:12 +02:00
c09f2e6f8f ngcext: parse ft1_have, ft1_bitset, pc1_announce 2024-06-22 12:48:54 +02:00
16 changed files with 1556 additions and 401 deletions

22
.gitignore vendored Normal file
View File

@ -0,0 +1,22 @@
.vs/
*.o
*.swp
~*
*~
.idea/
cmake-build-debug/
cmake-build-debugandtest/
cmake-build-release/
*.stackdump
*.coredump
compile_commands.json
/build*
.clangd
.cache
.DS_Store
.AppleDouble
.LSOverride
CMakeLists.txt.user*
CMakeCache.txt

View File

@ -54,6 +54,17 @@ add_library(solanaceae_sha1_ngcft1
./solanaceae/ngc_ft1_sha1/ft1_sha1_info.hpp
./solanaceae/ngc_ft1_sha1/ft1_sha1_info.cpp
./solanaceae/ngc_ft1_sha1/components.hpp
./solanaceae/ngc_ft1_sha1/components.cpp
./solanaceae/ngc_ft1_sha1/chunk_picker.hpp
./solanaceae/ngc_ft1_sha1/participation.hpp
./solanaceae/ngc_ft1_sha1/participation.cpp
./solanaceae/ngc_ft1_sha1/receiving_transfers.hpp
./solanaceae/ngc_ft1_sha1/receiving_transfers.cpp
./solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp
./solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp
)
@ -69,3 +80,22 @@ target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
solanaceae_file2
)
########################################
option(SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING "Build the solanaceae_ngcft1_sha1 tests" OFF)
message("II SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING " ${SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING})
# TODO: proper options n shit
if (SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING)
include(CTest)
#add_executable(bitset_tests
# ./solanaceae/ngc_ft1_sha1/bitset_tests.cpp
#)
#target_link_libraries(bitset_tests PUBLIC
# solanaceae_sha1_ngcft1
#)
endif()

View File

@ -1,8 +1,9 @@
#include "./ngcext.hpp"
#include <iostream>
#include <cassert>
NGCEXTEventProvider::NGCEXTEventProvider(ToxEventProviderI& tep) : _tep(tep) {
NGCEXTEventProvider::NGCEXTEventProvider(ToxI& t, ToxEventProviderI& tep) : _t(t), _tep(tep) {
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PACKET);
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET);
}
@ -261,6 +262,132 @@ bool NGCEXTEventProvider::parse_ft1_init_ack_v2(
);
}
bool NGCEXTEventProvider::parse_ft1_have(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
) {
if (!_private) {
std::cerr << "NGCEXT: ft1_have cant be public\n";
return false;
}
Events::NGCEXT_ft1_have e;
e.group_number = group_number;
e.peer_number = peer_number;
size_t curser = 0;
// - 4 byte (file_kind)
e.file_kind = 0u;
_DATA_HAVE(sizeof(e.file_kind), std::cerr << "NGCEXT: packet too small, missing file_kind\n"; return false)
for (size_t i = 0; i < sizeof(e.file_kind); i++, curser++) {
e.file_kind |= uint32_t(data[curser]) << (i*8);
}
// - X bytes (file_kind dependent id, differnt sizes)
uint16_t file_id_size = 0u;
_DATA_HAVE(sizeof(file_id_size), std::cerr << "NGCEXT: packet too small, missing file_id_size\n"; return false)
for (size_t i = 0; i < sizeof(file_id_size); i++, curser++) {
file_id_size |= uint32_t(data[curser]) << (i*8);
}
_DATA_HAVE(file_id_size, std::cerr << "NGCEXT: packet too small, missing file_id, or file_id_size too large\n"; return false)
e.file_id = {data+curser, data+curser+file_id_size};
curser += file_id_size;
// - array [
// - 4 bytes (chunk index)
// - ]
while (curser < data_size) {
_DATA_HAVE(sizeof(uint32_t), std::cerr << "NGCEXT: packet too small, broken chunk index\n"; return false)
uint32_t chunk_index = 0u;
for (size_t i = 0; i < sizeof(chunk_index); i++, curser++) {
chunk_index |= uint32_t(data[curser]) << (i*8);
}
e.chunks.push_back(chunk_index);
}
return dispatch(
NGCEXT_Event::FT1_HAVE,
e
);
}
bool NGCEXTEventProvider::parse_ft1_bitset(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
) {
if (!_private) {
std::cerr << "NGCEXT: ft1_bitset cant be public\n";
return false;
}
Events::NGCEXT_ft1_bitset e;
e.group_number = group_number;
e.peer_number = peer_number;
size_t curser = 0;
// - 4 byte (file_kind)
e.file_kind = 0u;
_DATA_HAVE(sizeof(e.file_kind), std::cerr << "NGCEXT: packet too small, missing file_kind\n"; return false)
for (size_t i = 0; i < sizeof(e.file_kind); i++, curser++) {
e.file_kind |= uint32_t(data[curser]) << (i*8);
}
// - X bytes (file_kind dependent id, differnt sizes)
uint16_t file_id_size = 0u;
_DATA_HAVE(sizeof(file_id_size), std::cerr << "NGCEXT: packet too small, missing file_id_size\n"; return false)
for (size_t i = 0; i < sizeof(file_id_size); i++, curser++) {
file_id_size |= uint32_t(data[curser]) << (i*8);
}
_DATA_HAVE(file_id_size, std::cerr << "NGCEXT: packet too small, missing file_id, or file_id_size too large\n"; return false)
e.file_id = {data+curser, data+curser+file_id_size};
curser += file_id_size;
e.start_chunk = 0u;
_DATA_HAVE(sizeof(e.start_chunk), std::cerr << "NGCEXT: packet too small, missing start_chunk\n"; return false)
for (size_t i = 0; i < sizeof(e.start_chunk); i++, curser++) {
e.start_chunk |= uint32_t(data[curser]) << (i*8);
}
// - X bytes
// - array [
// - 1 bit (have chunk)
// - ] (filled up with zero)
// high to low?
// simply rest of file packet
e.chunk_bitset = {data+curser, data+curser+(data_size-curser)};
return dispatch(
NGCEXT_Event::FT1_BITSET,
e
);
}
bool NGCEXTEventProvider::parse_pc1_announce(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
) {
// can be public
Events::NGCEXT_pc1_announce e;
e.group_number = group_number;
e.peer_number = peer_number;
size_t curser = 0;
// - X bytes (id, differnt sizes)
e.id = {data+curser, data+curser+(data_size-curser)};
return dispatch(
NGCEXT_Event::PC1_ANNOUNCE,
e
);
}
bool NGCEXTEventProvider::handlePacket(
const uint32_t group_number,
const uint32_t peer_number,
@ -292,6 +419,12 @@ bool NGCEXTEventProvider::handlePacket(
return parse_ft1_data_ack(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_MESSAGE:
return parse_ft1_message(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_HAVE:
return parse_ft1_have(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::FT1_BITSET:
return parse_ft1_bitset(group_number, peer_number, data+1, data_size-1, _private);
case NGCEXT_Event::PC1_ANNOUNCE:
return parse_pc1_announce(group_number, peer_number, data+1, data_size-1, _private);
default:
return false;
}
@ -299,6 +432,257 @@ bool NGCEXTEventProvider::handlePacket(
return false;
}
bool NGCEXTEventProvider::send_ft1_request(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
) {
// - 1 byte packet id
// - 4 byte file_kind
// - X bytes file_id
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_REQUEST));
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCEXTEventProvider::send_ft1_init(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
uint64_t file_size,
uint8_t transfer_id,
const uint8_t* file_id, size_t file_id_size
) {
// - 1 byte packet id
// - 4 byte (file_kind)
// - 8 bytes (data size)
// - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts)
// - X bytes (file_kind dependent id, differnt sizes)
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT));
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
for (size_t i = 0; i < sizeof(file_size); i++) {
pkg.push_back((file_size>>(i*8)) & 0xff);
}
pkg.push_back(transfer_id);
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCEXTEventProvider::send_ft1_init_ack(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id
) {
// - 1 byte packet id
// - 1 byte transfer_id
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT_ACK));
pkg.push_back(transfer_id);
// - 2 bytes max_lossy_data_size
const uint16_t max_lossy_data_size = _t.toxGroupMaxCustomLossyPacketLength() - 4;
for (size_t i = 0; i < sizeof(uint16_t); i++) {
pkg.push_back((max_lossy_data_size>>(i*8)) & 0xff);
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCEXTEventProvider::send_ft1_data(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id,
uint16_t sequence_id,
const uint8_t* data, size_t data_size
) {
assert(data_size > 0);
// TODO
// check header_size+data_size <= max pkg size
std::vector<uint8_t> pkg;
pkg.reserve(2048); // saves a ton of allocations
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA));
pkg.push_back(transfer_id);
pkg.push_back(sequence_id & 0xff);
pkg.push_back((sequence_id >> (1*8)) & 0xff);
// TODO: optimize
for (size_t i = 0; i < data_size; i++) {
pkg.push_back(data[i]);
}
// lossy
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCEXTEventProvider::send_ft1_data_ack(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id,
const uint16_t* seq_ids, size_t seq_ids_size
) {
std::vector<uint8_t> pkg;
pkg.reserve(1+1+2*32); // 32acks in a single pkg should be unlikely
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA_ACK));
pkg.push_back(transfer_id);
// TODO: optimize
for (size_t i = 0; i < seq_ids_size; i++) {
pkg.push_back(seq_ids[i] & 0xff);
pkg.push_back((seq_ids[i] >> (1*8)) & 0xff);
}
// lossy
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCEXTEventProvider::send_all_ft1_message(
uint32_t group_number,
uint32_t message_id,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
) {
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_MESSAGE));
for (size_t i = 0; i < sizeof(message_id); i++) {
pkg.push_back((message_id>>(i*8)) & 0xff);
}
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
// lossless
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
}
bool NGCEXTEventProvider::send_ft1_have(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size,
const uint32_t* chunks_data, size_t chunks_size
) {
// 16bit file id size
assert(file_id_size <= 0xffff);
if (file_id_size > 0xffff) {
return false;
}
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_HAVE));
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
// file id not last in packet, needs explicit size
const uint16_t file_id_size_cast = file_id_size;
for (size_t i = 0; i < sizeof(file_id_size_cast); i++) {
pkg.push_back((file_id_size_cast>>(i*8)) & 0xff);
}
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
// rest is chunks
for (size_t i = 0; i < chunks_size; i++) {
for (size_t i = 0; i < sizeof(chunks_data[i]); i++) {
pkg.push_back((chunks_data[i]>>(i*8)) & 0xff);
}
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCEXTEventProvider::send_ft1_bitset(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size,
uint32_t start_chunk,
const uint8_t* bitset_data, size_t bitset_size // size is bytes
) {
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_BITSET));
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
// file id not last in packet, needs explicit size
const uint16_t file_id_size_cast = file_id_size;
for (size_t i = 0; i < sizeof(file_id_size_cast); i++) {
pkg.push_back((file_id_size_cast>>(i*8)) & 0xff);
}
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
for (size_t i = 0; i < sizeof(start_chunk); i++) {
pkg.push_back((start_chunk>>(i*8)) & 0xff);
}
for (size_t i = 0; i < bitset_size; i++) {
pkg.push_back(bitset_data[i]);
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
static std::vector<uint8_t> build_pc1_announce(const uint8_t* id_data, size_t id_size) {
// - 1 byte packet id
// - X bytes (id, differnt sizes)
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::PC1_ANNOUNCE));
for (size_t i = 0; i < id_size; i++) {
pkg.push_back(id_data[i]);
}
return pkg;
}
bool NGCEXTEventProvider::send_pc1_announce(
uint32_t group_number, uint32_t peer_number,
const uint8_t* id_data, size_t id_size
) {
auto pkg = build_pc1_announce(id_data, id_size);
std::cout << "NEEP: sending PC1_ANNOUNCE s:" << pkg.size() - sizeof(NGCEXT_Event::PC1_ANNOUNCE) << "\n";
// lossless?
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCEXTEventProvider::send_all_pc1_announce(
uint32_t group_number,
const uint8_t* id_data, size_t id_size
) {
auto pkg = build_pc1_announce(id_data, id_size);
std::cout << "NEEP: sending all PC1_ANNOUNCE s:" << pkg.size() - sizeof(NGCEXT_Event::PC1_ANNOUNCE) << "\n";
// lossless?
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
}
bool NGCEXTEventProvider::onToxEvent(const Tox_Event_Group_Custom_Packet* e) {
const auto group_number = tox_event_group_custom_packet_get_group_number(e);
const auto peer_number = tox_event_group_custom_packet_get_peer_id(e);

View File

@ -3,6 +3,7 @@
// solanaceae port of tox_ngc_ext
#include <solanaceae/toxcore/tox_event_interface.hpp>
#include <solanaceae/toxcore/tox_interface.hpp>
#include <solanaceae/util/event_provider.hpp>
#include <solanaceae/toxcore/tox_key.hpp>
@ -119,7 +120,6 @@ namespace Events {
// - 4 byte (message_id)
uint32_t message_id;
// request the other side to initiate a FT
// - 4 byte (file_kind)
uint32_t file_kind;
@ -127,6 +127,49 @@ namespace Events {
std::vector<uint8_t> file_id;
};
struct NGCEXT_ft1_have {
uint32_t group_number;
uint32_t peer_number;
// - 4 byte (file_kind)
uint32_t file_kind;
// - X bytes (file_kind dependent id, differnt sizes)
std::vector<uint8_t> file_id;
// - array [
// - 4 bytes (chunk index)
// - ]
std::vector<uint32_t> chunks;
};
struct NGCEXT_ft1_bitset {
uint32_t group_number;
uint32_t peer_number;
// - 4 byte (file_kind)
uint32_t file_kind;
// - X bytes (file_kind dependent id, differnt sizes)
std::vector<uint8_t> file_id;
uint32_t start_chunk;
// - array [
// - 1 bit (have chunk)
// - ] (filled up with zero)
// high to low?
std::vector<uint8_t> chunk_bitset;
};
struct NGCEXT_pc1_announce {
uint32_t group_number;
uint32_t peer_number;
// - X bytes (id, differnt sizes)
std::vector<uint8_t> id;
};
} // Events
enum class NGCEXT_Event : uint8_t {
@ -186,11 +229,44 @@ enum class NGCEXT_Event : uint8_t {
// send file as message
// basically the opposite of request
// contains file_kind and file_id (and timestamp?)
// - 4 byte (message_id)
// - 4 byte (file_kind)
// - 4 bytes (message_id)
// - 4 bytes (file_kind)
// - X bytes (file_kind dependent id, differnt sizes)
FT1_MESSAGE,
// announce you have specified chunks, for given info
// this is info/chunk specific
// bundle these together to reduce overhead (like maybe every 16, max 1min)
// - 4 bytes (file_kind)
// - X bytes (file_kind dependent id, differnt sizes)
// - array [
// - 4 bytes (chunk index)
// - ]
FT1_HAVE,
// tell the other peer which chunks, for a given info you have
// compressed down to a bitset (in parts)
// supposed to only be sent once on participation announcement, when mutual interest
// it is always assumed by the other side, that you dont have the chunk, until told otherwise,
// so you can be smart about what you send.
// - 4 bytes (file_kind)
// - X bytes (file_kind dependent id, differnt sizes)
// - 4 bytes (first chunk index in bitset)
// - array [
// - 1 bit (have chunk)
// - ] (filled up with zero)
FT1_BITSET,
// TODO: FT1_IDONTHAVE, tell a peer you no longer have said chunk
// TODO: FT1_REJECT, tell a peer you wont fulfil the request
// tell another peer that you are participating in X
// you can reply with PC1_ANNOUNCE, to let the other side know, you too are participating in X
// you should NOT announce often, since this hits peers that not participate
// ft1 uses fk+id
// - x bytes (id, different sizes)
PC1_ANNOUNCE = 0x80 | 32u,
MAX
};
@ -204,15 +280,19 @@ struct NGCEXTEventI {
virtual bool onEvent(const Events::NGCEXT_ft1_data&) { return false; }
virtual bool onEvent(const Events::NGCEXT_ft1_data_ack&) { return false; }
virtual bool onEvent(const Events::NGCEXT_ft1_message&) { return false; }
virtual bool onEvent(const Events::NGCEXT_ft1_have&) { return false; }
virtual bool onEvent(const Events::NGCEXT_ft1_bitset&) { return false; }
virtual bool onEvent(const Events::NGCEXT_pc1_announce&) { return false; }
};
using NGCEXTEventProviderI = EventProviderI<NGCEXTEventI>;
class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
ToxI& _t;
ToxEventProviderI& _tep;
public:
NGCEXTEventProvider(ToxEventProviderI& tep/*, ToxI& t*/);
NGCEXTEventProvider(ToxI& t, ToxEventProviderI& tep);
protected:
bool parse_hs1_request_last_ids(
@ -269,6 +349,24 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
bool _private
);
bool parse_ft1_have(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
);
bool parse_ft1_bitset(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
);
bool parse_pc1_announce(
uint32_t group_number, uint32_t peer_number,
const uint8_t* data, size_t data_size,
bool _private
);
bool handlePacket(
const uint32_t group_number,
const uint32_t peer_number,
@ -277,6 +375,72 @@ class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI {
const bool _private
);
public: // send api
bool send_ft1_request(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
);
bool send_ft1_init(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
uint64_t file_size,
uint8_t transfer_id,
const uint8_t* file_id, size_t file_id_size
);
bool send_ft1_init_ack(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id
);
bool send_ft1_data(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id,
uint16_t sequence_id,
const uint8_t* data, size_t data_size
);
bool send_ft1_data_ack(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id,
const uint16_t* seq_ids, size_t seq_ids_size
);
// TODO: add private version
bool send_all_ft1_message(
uint32_t group_number,
uint32_t message_id,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
);
bool send_ft1_have(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size,
const uint32_t* chunks_data, size_t chunks_size
);
bool send_ft1_bitset(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size,
uint32_t start_chunk,
const uint8_t* bitset_data, size_t bitset_size // size is bytes
);
bool send_pc1_announce(
uint32_t group_number, uint32_t peer_number,
const uint8_t* id_data, size_t id_size
);
bool send_all_pc1_announce(
uint32_t group_number,
const uint8_t* id_data, size_t id_size
);
protected:
bool onToxEvent(const Tox_Event_Group_Custom_Packet* e) override;
bool onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) override;

View File

@ -15,147 +15,6 @@
#include <cassert>
#include <vector>
bool NGCFT1::sendPKG_FT1_REQUEST(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
) {
// - 1 byte packet id
// - 4 byte file_kind
// - X bytes file_id
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_REQUEST));
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCFT1::sendPKG_FT1_INIT(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
uint64_t file_size,
uint8_t transfer_id,
const uint8_t* file_id, size_t file_id_size
) {
// - 1 byte packet id
// - 4 byte (file_kind)
// - 8 bytes (data size)
// - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts)
// - X bytes (file_kind dependent id, differnt sizes)
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT));
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
for (size_t i = 0; i < sizeof(file_size); i++) {
pkg.push_back((file_size>>(i*8)) & 0xff);
}
pkg.push_back(transfer_id);
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCFT1::sendPKG_FT1_INIT_ACK(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id
) {
// - 1 byte packet id
// - 1 byte transfer_id
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_INIT_ACK));
pkg.push_back(transfer_id);
// - 2 bytes max_lossy_data_size
const uint16_t max_lossy_data_size = _t.toxGroupMaxCustomLossyPacketLength() - 4;
for (size_t i = 0; i < sizeof(uint16_t); i++) {
pkg.push_back((max_lossy_data_size>>(i*8)) & 0xff);
}
// lossless
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCFT1::sendPKG_FT1_DATA(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id,
uint16_t sequence_id,
const uint8_t* data, size_t data_size
) {
assert(data_size > 0);
// TODO
// check header_size+data_size <= max pkg size
std::vector<uint8_t> pkg;
pkg.reserve(2048); // saves a ton of allocations
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA));
pkg.push_back(transfer_id);
pkg.push_back(sequence_id & 0xff);
pkg.push_back((sequence_id >> (1*8)) & 0xff);
// TODO: optimize
for (size_t i = 0; i < data_size; i++) {
pkg.push_back(data[i]);
}
// lossy
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCFT1::sendPKG_FT1_DATA_ACK(
uint32_t group_number, uint32_t peer_number,
uint8_t transfer_id,
const uint16_t* seq_ids, size_t seq_ids_size
) {
std::vector<uint8_t> pkg;
pkg.reserve(1+1+2*32); // 32acks in a single pkg should be unlikely
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_DATA_ACK));
pkg.push_back(transfer_id);
// TODO: optimize
for (size_t i = 0; i < seq_ids_size; i++) {
pkg.push_back(seq_ids[i] & 0xff);
pkg.push_back((seq_ids[i] >> (1*8)) & 0xff);
}
// lossy
return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK;
}
bool NGCFT1::sendPKG_FT1_MESSAGE(
uint32_t group_number,
uint32_t message_id,
uint32_t file_kind,
const uint8_t* file_id, size_t file_id_size
) {
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::FT1_MESSAGE));
for (size_t i = 0; i < sizeof(message_id); i++) {
pkg.push_back((message_id>>(i*8)) & 0xff);
}
for (size_t i = 0; i < sizeof(file_kind); i++) {
pkg.push_back((file_kind>>(i*8)) & 0xff);
}
for (size_t i = 0; i < file_id_size; i++) {
pkg.push_back(file_id[i]);
}
// lossless
return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK;
}
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
auto& tf_opt = peer.send_transfers.at(idx);
assert(tf_opt.has_value());
@ -181,7 +40,8 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
} else {
// timed out, resend
std::cerr << "NGCFT1 warning: ft init timed out, resending\n";
sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
//sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
_neep.send_ft1_init(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
tf.inits_sent++;
tf.time_since_activity = 0.f;
}
@ -191,7 +51,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
case State::FINISHING: // we still have unacked packets
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) {
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
@ -241,7 +101,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
// TODO: can fail
sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size());
_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size());
peer.cca->onLoss({idx, id}, false);
time_since_activity = 0.f;
timeouts_set.erase({idx, id});
@ -278,7 +138,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
);
uint16_t seq_id = tf.ssb.add(std::move(new_data));
const bool sent = sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
const bool sent = _neep.send_ft1_data(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
if (sent) {
peer.cca->onSent({idx, seq_id}, chunk_size);
} else {
@ -338,7 +198,7 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
NGCFT1::NGCFT1(
ToxI& t,
ToxEventProviderI& tep,
NGCEXTEventProviderI& neep
NGCEXTEventProvider& neep
) : _t(t), _tep(tep), _neep(neep)
{
_neep.subscribe(this, NGCEXT_Event::FT1_REQUEST);
@ -392,7 +252,7 @@ void NGCFT1::NGC_FT1_send_request_private(
const uint8_t* file_id, size_t file_id_size
) {
// TODO: error check
sendPKG_FT1_REQUEST(group_number, peer_number, file_kind, file_id, file_id_size);
_neep.send_ft1_request(group_number, peer_number, file_kind, file_id, file_id_size);
}
bool NGCFT1::NGC_FT1_send_init_private(
@ -433,7 +293,7 @@ bool NGCFT1::NGC_FT1_send_init_private(
}
// TODO: check return value
sendPKG_FT1_INIT(group_number, peer_number, file_kind, file_size, idx, file_id, file_id_size);
_neep.send_ft1_init(group_number, peer_number, file_kind, file_size, idx, file_id, file_id_size);
peer.send_transfers[idx] = Group::Peer::SendTransfer{
file_kind,
@ -463,7 +323,7 @@ bool NGCFT1::NGC_FT1_send_message_public(
message_id = randombytes_random();
// TODO: check return value
return sendPKG_FT1_MESSAGE(group_number, message_id, file_kind, file_id, file_id_size);
return _neep.send_all_ft1_message(group_number, message_id, file_kind, file_id, file_id_size);
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) {
@ -506,7 +366,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
return true; // return true?
}
sendPKG_FT1_INIT_ACK(e.group_number, e.peer_number, e.transfer_id);
_neep.send_ft1_init_ack(e.group_number, e.peer_number, e.transfer_id);
std::cout << "NGCFT1: accepted init\n";
@ -628,7 +488,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
// TODO: check if this caps at max acks
if (!ack_seq_ids.empty()) {
// TODO: check return value
sendPKG_FT1_DATA_ACK(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size());
_neep.send_ft1_data_ack(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size());
}

View File

@ -131,7 +131,7 @@ using NGCFT1EventProviderI = EventProviderI<NGCFT1EventI>;
class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProviderI {
ToxI& _t;
ToxEventProviderI& _tep;
NGCEXTEventProviderI& _neep;
NGCEXTEventProvider& _neep; // not the interface?
std::default_random_engine _rng{std::random_device{}()};
@ -201,13 +201,6 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
std::map<uint32_t, Group> groups;
protected:
bool sendPKG_FT1_REQUEST(uint32_t group_number, uint32_t peer_number, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
bool sendPKG_FT1_INIT(uint32_t group_number, uint32_t peer_number, uint32_t file_kind, uint64_t file_size, uint8_t transfer_id, const uint8_t* file_id, size_t file_id_size);
bool sendPKG_FT1_INIT_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id);
bool sendPKG_FT1_DATA(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, uint16_t sequence_id, const uint8_t* data, size_t data_size);
bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size);
bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size);
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
@ -215,7 +208,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
NGCFT1(
ToxI& t,
ToxEventProviderI& tep,
NGCEXTEventProviderI& neep
NGCEXTEventProvider& neep
);
float iterate(float delta);

View File

@ -0,0 +1,178 @@
#pragma once
#include <algorithm>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/object_store/object_store.hpp>
#include "./components.hpp"
#include <entt/container/dense_map.hpp>
#include <entt/container/dense_set.hpp>
#include <cstddef>
#include <cstdint>
#include <iostream>
//#include <solanaceae/ngc_ft1/ngcft1.hpp>
// goal is to always keep 2 transfers running and X(6) requests queued up
// per peer
// contact component?
struct ChunkPicker {
// max transfers
static constexpr size_t max_tf_info_requests {1};
static constexpr size_t max_tf_chunk_requests {2};
// max outstanding requests
// TODO: should this include transfers?
static constexpr size_t max_open_info_requests {1};
const size_t max_open_chunk_requests {6};
// TODO: handle with hash utils?
struct ParticipationEntry {
ParticipationEntry(void) {}
// skips in round robin -> lower should_skip => higher priority
uint16_t should_skip {2}; // 0 high, 8 low (double each time? 0,1,2,4,8)
uint16_t skips {2};
};
// TODO: only unfinished?
entt::dense_map<Object, ParticipationEntry> participating_unfinished;
entt::dense_set<Object> participating;
Object participating_in_last {entt::null};
// tick
//void sendInfoRequests();
// is this like a system?
// TODO: only update on:
// - transfer start?
// - transfer done
// - request timed out
// - reset on disconnect?
struct ContentChunkR {
ObjectHandle object;
size_t chunk_index;
};
// returns list of chunks to request
std::vector<ContentChunkR> updateChunkRequests(
Contact3Handle c,
ObjectRegistry& objreg,
size_t num_requests
//NGCFT1& nft
) {
std::vector<ContentChunkR> req_ret;
// count running tf and open requests
// TODO: replace num_requests with this logic
// while n < X
while (false && !participating_unfinished.empty()) {
// round robin content (remember last obj)
if (!objreg.valid(participating_in_last)) {
participating_in_last = participating_unfinished.begin()->first;
//participating_in_last = *participating_unfinished.begin();
}
assert(objreg.valid(participating_in_last));
auto it = participating_unfinished.find(participating_in_last);
// hard limit robin rounds to array size time 100
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*100; i++) {
if (it == participating_unfinished.end()) {
it = participating_unfinished.begin();
}
if (it->second.skips < it->second.should_skip) {
it->second.skips++;
continue;
}
ObjectHandle o {objreg, it->first};
// intersect self have with other have
if (!o.all_of<Components::RemoteHave, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
// rare case where no one other has anything
continue;
}
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
if (cc.have_all) {
std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n";
continue;
}
const auto& others_have = o.get<Components::RemoteHave>().others;
auto other_it = others_have.find(c);
if (other_it == others_have.end()) {
// rare case where the other is participating but has nothing
continue;
}
const auto& other_have = other_it->second;
BitSet chunk_candidates = cc.have_chunk;
if (!other_have.have_all) {
// AND is the same as ~(~A | ~B)
// that means we leave chunk_candidates as (have is inverted want)
// merge is or
// invert at the end
chunk_candidates
.merge(other_have.have.invert())
.invert();
// TODO: add intersect for more perf
} else {
chunk_candidates.invert();
}
const auto total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
// TODO: trim off round up to 8, since they are now always set
// now select (globaly) unrequested other have
// TODO: pick strategies
// TODO: how do we prioratize within a file?
// - first (walk from start (or readhead?))
// - random (choose random start pos and walk)
// - rarest (keep track of rarity and sort by that)
// - steaming (use read head to determain time critical chunks, potentially over requesting, first (relative to stream head) otherwise
// maybe look into libtorrens deadline stuff
// - arbitrary priority maps/functions (and combine with above in rations)
// simple, we use first
for (size_t i = 0; i < total_chunks && req_ret.size() < num_requests && i < chunk_candidates.size_bits(); i++) {
if (!chunk_candidates[i]) {
continue;
}
// i is a candidate we can request form peer
// first check against double requests
if (std::find_if(req_ret.cbegin(), req_ret.cend(), [&](const auto& x) -> bool {
return false;
}) != req_ret.cend()) {
// already in return array
// how did we get here? should we fast exit? if simple-first strat, we would want to
continue; // skip
}
// TODO: also check against globally running transfers!!!
// if nothing else blocks this, add to ret
req_ret.push_back(ContentChunkR{o, i});
}
}
}
// -- no -- (just compat with old code, ignore)
// if n < X
// optimistically request 1 chunk other does not have
// (don't mark es requested? or lower cooldown to re-request?)
return req_ret;
}
// - reset on disconnect?
void resetPeer(
Contact3Handle c
);
};

View File

@ -0,0 +1,25 @@
#include "./components.hpp"
std::vector<size_t> Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
const auto it = chunk_hash_to_index.find(hash);
if (it != chunk_hash_to_index.cend()) {
return it->second;
} else {
return {};
}
}
bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
if (have_all) { // short cut
return true;
}
if (auto i_vec = chunkIndices(hash); !i_vec.empty()) {
// TODO: should i test all?
return have_chunk[i_vec.front()];
}
// not part of this file
return false;
}

View File

@ -0,0 +1,81 @@
#pragma once
#include <solanaceae/contact/components.hpp>
#include <solanaceae/message3/components.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/util/bitset.hpp>
#include <entt/container/dense_set.hpp>
#include "./ft1_sha1_info.hpp"
#include "./hash_utils.hpp"
#include <vector>
// TODO: rename to object components
namespace Components {
struct Messages {
// dense set instead?
std::vector<Message3Handle> messages;
};
using FT1InfoSHA1 = FT1InfoSHA1;
struct FT1InfoSHA1Data {
std::vector<uint8_t> data;
};
struct FT1InfoSHA1Hash {
std::vector<uint8_t> hash;
};
struct FT1ChunkSHA1Cache {
// TODO: extract have_chunk, have_all and have_count to generic comp
// have_chunk is the size of info.chunks.size(), or empty if have_all
// keep in mind bitset rounds up to 8s
BitSet have_chunk{0};
bool have_all {false};
size_t have_count {0};
entt::dense_map<SHA1Digest, std::vector<size_t>> chunk_hash_to_index;
std::vector<size_t> chunkIndices(const SHA1Digest& hash) const;
bool haveChunk(const SHA1Digest& hash) const;
};
struct FT1ChunkSHA1Requested {
// requested chunks with a timer since last request
entt::dense_map<size_t, float> chunks;
};
// TODO: once announce is shipped, remove the "Suspected"
struct SuspectedParticipants {
entt::dense_set<Contact3> participants;
};
struct RemoteHave {
struct Entry {
bool have_all {false};
BitSet have;
};
entt::dense_map<Contact3, Entry> others;
};
struct ReRequestInfoTimer {
float timer {0.f};
};
struct ReadHeadHint {
// points to the first byte we want
// this is just a hint, that can be set from outside
// to guide the sequential "piece picker" strategy
// the strategy *should* set this to the first byte we dont yet have
uint64_t offset_into_file {0u};
};
} // Components

View File

@ -18,7 +18,7 @@ struct SHA1Digest {
bool operator==(const SHA1Digest& other) const { return data == other.data; }
bool operator!=(const SHA1Digest& other) const { return data != other.data; }
size_t size(void) const { return data.size(); }
constexpr size_t size(void) const { return data.size(); }
};
std::ostream& operator<<(std::ostream& out, const SHA1Digest& v);

View File

@ -0,0 +1,34 @@
#include "./participation.hpp"
#include "./chunk_picker.hpp"
bool addParticipation(Contact3Handle c, ObjectHandle o) {
bool was_new {false};
if (static_cast<bool>(o)) {
const auto [_, inserted] = o.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
was_new = inserted;
}
if (static_cast<bool>(c)) {
const auto [_, inserted] = c.get_or_emplace<ChunkPicker>().participating.emplace(o);
was_new = was_new || inserted;
// TODO: if not have_all
c.get_or_emplace<ChunkPicker>().participating_unfinished.emplace(o, ChunkPicker::ParticipationEntry{});
}
return was_new;
}
void removeParticipation(Contact3Handle c, ObjectHandle o) {
if (static_cast<bool>(o) && o.all_of<Components::SuspectedParticipants>()) {
o.get<Components::SuspectedParticipants>().participants.erase(c);
}
if (static_cast<bool>(c) && c.all_of<ChunkPicker>()) {
c.get<ChunkPicker>().participating.erase(o);
c.get<ChunkPicker>().participating_unfinished.erase(o);
}
}

View File

@ -0,0 +1,8 @@
#pragma once
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/contact/contact_model3.hpp>
bool addParticipation(Contact3Handle c, ObjectHandle o);
void removeParticipation(Contact3Handle c, ObjectHandle o);

View File

@ -0,0 +1,122 @@
#include "./receiving_transfers.hpp"
#include <iostream>
void ReceivingTransfers::tick(float delta) {
for (auto peer_it = _data.begin(); peer_it != _data.end();) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 20sec, timeout
if (it->second.time_since_activity >= 20.f) {
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
//it->second.v
it = peer_it->second.erase(it);
} else {
it++;
}
}
if (peer_it->second.empty()) {
// cleanup unused peers too agressive?
peer_it = _data.erase(peer_it);
} else {
peer_it++;
}
}
}
ReceivingTransfers::Entry& ReceivingTransfers::emplaceInfo(uint64_t combined_id, uint8_t transfer_id, const Entry::Info& info) {
auto& ent = _data[combined_id][transfer_id];
ent.v = info;
return ent;
}
ReceivingTransfers::Entry& ReceivingTransfers::emplaceChunk(uint64_t combined_id, uint8_t transfer_id, const Entry::Chunk& chunk) {
assert(!chunk.chunk_indices.empty());
assert(!containsPeerChunk(combined_id, chunk.content, chunk.chunk_indices.front()));
auto& ent = _data[combined_id][transfer_id];
ent.v = chunk;
return ent;
}
bool ReceivingTransfers::containsPeerTransfer(uint64_t combined_id, uint8_t transfer_id) const {
auto it = _data.find(combined_id);
if (it == _data.end()) {
return false;
}
return it->second.count(transfer_id);
}
bool ReceivingTransfers::containsChunk(ObjectHandle o, size_t chunk_idx) const {
for (const auto& [_, p] : _data) {
for (const auto& [_2, v] : p) {
if (!v.isChunk()) {
continue;
}
const auto& c = v.getChunk();
if (c.content != o) {
continue;
}
for (const auto idx : c.chunk_indices) {
if (idx == chunk_idx) {
return true;
}
}
}
}
return false;
}
bool ReceivingTransfers::containsPeerChunk(uint64_t combined_id, ObjectHandle o, size_t chunk_idx) const {
auto it = _data.find(combined_id);
if (it == _data.end()) {
return false;
}
for (const auto& [_, v] : it->second) {
if (!v.isChunk()) {
continue;
}
const auto& c = v.getChunk();
if (c.content != o) {
continue;
}
for (const auto idx : c.chunk_indices) {
if (idx == chunk_idx) {
return true;
}
}
}
return false;
}
void ReceivingTransfers::removePeer(uint64_t combined_id) {
_data.erase(combined_id);
}
void ReceivingTransfers::removePeerTransfer(uint64_t combined_id, uint8_t transfer_id) {
auto it = _data.find(combined_id);
if (it == _data.end()) {
return;
}
it->second.erase(transfer_id);
}
size_t ReceivingTransfers::size(void) const {
size_t count {0};
for (const auto& [_, p] : _data) {
count += p.size();
}
return count;
}

View File

@ -0,0 +1,63 @@
#pragma once
#include <solanaceae/object_store/object_store.hpp>
#include <entt/container/dense_map.hpp>
#include <cstdint>
#include <variant>
#include <vector>
struct ReceivingTransfers {
struct Entry {
struct Info {
ObjectHandle content;
// copy of info data
// too large?
std::vector<uint8_t> info_data;
};
struct Chunk {
ObjectHandle content;
std::vector<size_t> chunk_indices;
// or data?
// if memmapped, this would be just a pointer
};
std::variant<Info, Chunk> v;
float time_since_activity {0.f};
bool isInfo(void) const { return std::holds_alternative<Info>(v); }
bool isChunk(void) const { return std::holds_alternative<Chunk>(v); }
Info& getInfo(void) { return std::get<Info>(v); }
const Info& getInfo(void) const { return std::get<Info>(v); }
Chunk& getChunk(void) { return std::get<Chunk>(v); }
const Chunk& getChunk(void) const { return std::get<Chunk>(v); }
};
// key is groupid + peerid
// TODO: replace with contact
//using ReceivingTransfers = entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransferE>>;
entt::dense_map<uint64_t, entt::dense_map<uint8_t, Entry>> _data;
void tick(float delta);
Entry& emplaceInfo(uint64_t combined_id, uint8_t transfer_id, const Entry::Info& info);
Entry& emplaceChunk(uint64_t combined_id, uint8_t transfer_id, const Entry::Chunk& chunk);
bool containsPeer(uint64_t combined_id) const { return _data.count(combined_id); }
bool containsPeerTransfer(uint64_t combined_id, uint8_t transfer_id) const;
bool containsChunk(ObjectHandle o, size_t chunk_idx) const;
bool containsPeerChunk(uint64_t combined_id, ObjectHandle o, size_t chunk_idx) const;
auto& getPeer(uint64_t combined_id) { return _data.at(combined_id); }
auto& getTransfer(uint64_t combined_id, uint8_t transfer_id) { return getPeer(combined_id).at(transfer_id); }
void removePeer(uint64_t combined_id);
void removePeerTransfer(uint64_t combined_id, uint8_t transfer_id);
size_t size(void) const;
};

View File

@ -16,11 +16,16 @@
#include "./file_rw_mapped.hpp"
#include "./components.hpp"
#include "./chunk_picker.hpp"
#include "./participation.hpp"
#include <iostream>
#include <variant>
#include <filesystem>
#include <mutex>
#include <future>
#include <vector>
namespace Message::Components {
@ -28,79 +33,6 @@ namespace Message::Components {
} // Message::Components
// TODO: rename to object components
namespace Components {
struct Messages {
std::vector<Message3Handle> messages;
};
using FT1InfoSHA1 = FT1InfoSHA1;
struct FT1InfoSHA1Data {
std::vector<uint8_t> data;
};
struct FT1InfoSHA1Hash {
std::vector<uint8_t> hash;
};
struct FT1ChunkSHA1Cache {
std::vector<bool> have_chunk;
bool have_all {false};
size_t have_count {0};
entt::dense_map<SHA1Digest, std::vector<size_t>> chunk_hash_to_index;
std::vector<size_t> chunkIndices(const SHA1Digest& hash) const;
bool haveChunk(const SHA1Digest& hash) const;
};
struct FT1ChunkSHA1Requested {
// requested chunks with a timer since last request
entt::dense_map<size_t, float> chunks;
};
struct SuspectedParticipants {
entt::dense_set<Contact3> participants;
};
struct ReRequestInfoTimer {
float timer {0.f};
};
struct ReadHeadHint {
// points to the first byte we want
// this is just a hint, that can be set from outside
// to guide the sequential "piece picker" strategy
// the strategy *should* set this to the first byte we dont yet have
uint64_t offset_into_file {0u};
};
} // Components
std::vector<size_t> Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
const auto it = chunk_hash_to_index.find(hash);
if (it != chunk_hash_to_index.cend()) {
return it->second;
} else {
return {};
}
}
bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
if (have_all) { // short cut
return true;
}
if (auto i_vec = chunkIndices(hash); !i_vec.empty()) {
// TODO: should i test all?
return have_chunk[i_vec.front()];
}
// not part of this file
return false;
}
static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
if (chunk_index+1 == sha1_info.chunks.size()) {
// last chunk
@ -185,7 +117,6 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle ce) {
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) {
// get a list of peers we can request this file from
// TODO: randomly request from non SuspectedParticipants
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
@ -200,10 +131,11 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(O
}
}
// 1 in 20 chance to ask random peer instead
// 1 in 40 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
if (tox_peers.empty() || (_rng()%20) == 0) {
// TODO: remove once pc1_announce is shipped
if (tox_peers.empty() || (_rng()%40) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
@ -250,13 +182,17 @@ SHA1_NGCFT1::SHA1_NGCFT1(
Contact3Registry& cr,
RegistryMessageModel& rmm,
NGCFT1& nft,
ToxContactModel2& tcm
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCEXTEventProvider& neep
) :
_os(os),
_cr(cr),
_rmm(rmm),
_nft(nft),
_tcm(tcm)
_tcm(tcm),
_tep(tep),
_neep(neep)
{
// TODO: also create and destroy
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
@ -274,6 +210,12 @@ SHA1_NGCFT1::SHA1_NGCFT1(
//_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
_rmm.subscribe(this, RegistryMessageModel_Event::send_file_path);
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT);
_neep.subscribe(this, NGCEXT_Event::PC1_ANNOUNCE);
_neep.subscribe(this, NGCEXT_Event::FT1_HAVE);
_neep.subscribe(this, NGCEXT_Event::FT1_BITSET);
}
void SHA1_NGCFT1::iterate(float delta) {
@ -314,28 +256,7 @@ void SHA1_NGCFT1::iterate(float delta) {
}
// receiving transfers
for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 10sec, timeout
if (it->second.time_since_activity >= 10.f) {
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
//it->second.v
it = peer_it->second.erase(it);
} else {
it++;
}
}
if (peer_it->second.empty()) {
// cleanup unused peers too agressive?
peer_it = _receiving_transfers.erase(peer_it);
} else {
peer_it++;
}
}
_receiving_transfers.tick(delta);
// queued requests
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
@ -388,10 +309,7 @@ void SHA1_NGCFT1::iterate(float delta) {
for (const auto& [_, transfers] : _sending_transfers) {
running_sending_transfer_count += transfers.size();
}
size_t running_receiving_transfer_count {0};
for (const auto& [_, transfers] : _receiving_transfers) {
running_receiving_transfer_count += transfers.size();
}
size_t running_receiving_transfer_count {_receiving_transfers.size()};
if (running_sending_transfer_count < _max_concurrent_out) {
// TODO: for each peer? transfer cap per peer?
@ -484,8 +402,12 @@ void SHA1_NGCFT1::iterate(float delta) {
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
const auto& info = ce.get<Components::FT1InfoSHA1>();
if (cc.have_all) {
_queue_content_want_chunk.pop_front();
} else {
// naive, choose first chunk we dont have (double requests!!)
for (size_t chunk_idx = 0; chunk_idx < cc.have_chunk.size(); chunk_idx++) {
// TODO: piece picker, choose what other have (invert selectPeerForRequest)
for (size_t chunk_idx = 0; chunk_idx < info.chunks.size() /* cc.total_ */; chunk_idx++) {
if (cc.have_chunk[chunk_idx]) {
continue;
}
@ -493,11 +415,11 @@ void SHA1_NGCFT1::iterate(float delta) {
// check by hash
if (cc.haveChunk(info.chunks.at(chunk_idx))) {
// TODO: fix this, a completed chunk should fill all the indecies it occupies
cc.have_chunk[chunk_idx] = true;
cc.have_chunk.set(chunk_idx);
cc.have_count += 1;
if (cc.have_count == info.chunks.size()) {
cc.have_all = true;
cc.have_chunk.clear();
cc.have_chunk = BitSet(0); // conserve space
break;
}
continue;
@ -532,6 +454,7 @@ void SHA1_NGCFT1::iterate(float delta) {
}
}
}
}
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// see tox_transfer_manager.cpp for reference
@ -578,6 +501,7 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
{ // next, create chuck cache and check for existing data
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
auto& bytes_received = ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total;
cc.have_chunk = BitSet(info.chunks.size());
cc.have_all = false;
cc.have_count = 0;
@ -594,9 +518,8 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
const auto data_hash = SHA1Digest{hash_sha1(existing_data.ptr, existing_data.size)};
const bool data_equal = data_hash == info.chunks.at(i);
cc.have_chunk.push_back(data_equal);
if (data_equal) {
cc.have_chunk.set(i);
cc.have_count += 1;
bytes_received += chunk_size;
//std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n";
@ -618,7 +541,6 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
}
} else {
for (size_t i = 0; i < info.chunks.size(); i++) {
cc.have_chunk.push_back(false);
_chunks[info.chunks[i]] = ce;
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
}
@ -632,6 +554,34 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
// announce we are participating
// since this is the first time, we publicly announce to all
if (e.e.all_of<Message::Components::ContactFrom, Message::Components::ContactTo>()) {
const auto c_f = e.e.get<Message::Components::ContactFrom>().c;
const auto c_t = e.e.get<Message::Components::ContactTo>().c;
std::vector<uint8_t> announce_id;
const uint32_t file_kind = static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO);
for (size_t i = 0; i < sizeof(file_kind); i++) {
announce_id.push_back((file_kind>>(i*8)) & 0xff);
}
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
announce_id.insert(announce_id.cend(), info_hash.cbegin(), info_hash.cend());
if (_cr.all_of<Contact::Components::ToxGroupEphemeral>(c_t)) {
// public
const auto group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c_t).group_number;
_neep.send_all_pc1_announce(group_number, announce_id.data(), announce_id.size());
} else if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c_f)) {
// private ?
const auto [group_number, peer_number] = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c_f);
_neep.send_pc1_announce(group_number, peer_number, announce_id.data(), announce_id.size());
}
}
ce.remove<Message::Components::Transfer::TagPaused>();
// should?
@ -701,7 +651,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
{ // they advertise interest in the content
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
addParticipation(c, ce);
}
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
@ -748,10 +698,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
return false;
}
_receiving_transfers
[combineIds(e.group_number, e.peer_number)]
[e.transfer_id]
.v = ReceivingTransfer::Info{ce, std::vector<uint8_t>(e.file_size)};
_receiving_transfers.emplaceInfo(
combineIds(e.group_number, e.peer_number),
e.transfer_id,
{ce, std::vector<uint8_t>(e.file_size)}
);
e.accept = true;
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
@ -768,7 +719,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
{ // they have the content (probably, might be fake, should move this to done)
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
addParticipation(c, ce);
}
assert(ce.all_of<Components::FT1InfoSHA1>());
@ -791,10 +742,11 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
// TODO: check e.file_size
assert(e.file_size == info.chunkSize(idx_vec.front()));
_receiving_transfers
[combineIds(e.group_number, e.peer_number)]
[e.transfer_id]
.v = ReceivingTransfer::Chunk{ce, idx_vec};
_receiving_transfers.emplaceChunk(
combineIds(e.group_number, e.peer_number),
e.transfer_id,
ReceivingTransfers::Entry::Chunk{ce, idx_vec}
);
e.accept = true;
@ -807,31 +759,28 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
if (!_receiving_transfers.containsPeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id)) {
return false;
}
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
if (!peer_transfers.count(e.transfer_id)) {
return false;
}
auto& transfer = _receiving_transfers.getTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
auto& tv = peer_transfers[e.transfer_id].v;
peer_transfers[e.transfer_id].time_since_activity = 0.f;
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
auto& info_data = std::get<ReceivingTransfer::Info>(tv).info_data;
transfer.time_since_activity = 0.f;
if (transfer.isInfo()) {
auto& info_data = transfer.getInfo().info_data;
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
info_data[i+e.data_offset] = e.data[i];
}
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
} else if (transfer.isChunk()) {
auto o = transfer.getChunk().content;
assert(ce.all_of<Message::Components::Transfer::File>());
auto* file = ce.get<Message::Components::Transfer::File>().get();
assert(o.all_of<Message::Components::Transfer::File>());
auto* file = o.get<Message::Components::Transfer::File>().get();
assert(file != nullptr);
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
const auto chunk_size = o.get<Components::FT1InfoSHA1>().chunk_size;
for (const auto chunk_index : transfer.getChunk().chunk_indices) {
const auto offset_into_file = chunk_index * chunk_size;
if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
@ -897,72 +846,68 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
}
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
if (!_receiving_transfers.containsPeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id)) {
return false;
}
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
if (!peer_transfers.count(e.transfer_id)) {
return false;
}
auto& transfer = _receiving_transfers.getTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
const auto& tv = peer_transfers[e.transfer_id].v;
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
auto& info = std::get<ReceivingTransfer::Info>(tv);
auto ce = info.content;
if (transfer.isInfo()) {
auto& info = transfer.getInfo();
auto o = info.content;
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data>()) {
if (o.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data>()) {
// we already have the info, discard
peer_transfers.erase(e.transfer_id);
_receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
return true;
}
// check if data matches hash
auto hash = hash_sha1(info.info_data.data(), info.info_data.size());
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
if (ce.get<Components::FT1InfoSHA1Hash>().hash != hash) {
assert(o.all_of<Components::FT1InfoSHA1Hash>());
if (o.get<Components::FT1InfoSHA1Hash>().hash != hash) {
std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n";
// requeue info request
peer_transfers.erase(e.transfer_id);
// TODO: requeue info request; eg manipulate o.get<Components::ReRequestInfoTimer>();
_receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
return true;
}
const auto& info_data = ce.emplace_or_replace<Components::FT1InfoSHA1Data>(std::move(info.info_data)).data;
auto& ft_info = ce.emplace_or_replace<Components::FT1InfoSHA1>();
const auto& info_data = o.emplace_or_replace<Components::FT1InfoSHA1Data>(std::move(info.info_data)).data;
auto& ft_info = o.emplace_or_replace<Components::FT1InfoSHA1>();
ft_info.fromBuffer(info_data);
{ // file info
// TODO: not overwrite fi? since same?
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
file_info.total_size = ft_info.file_size;
}
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
ce.remove<Components::ReRequestInfoTimer>();
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
o.remove<Components::ReRequestInfoTimer>();
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) {
_queue_content_want_info.erase(it);
}
ce.emplace_or_replace<Message::Components::Transfer::TagPaused>();
o.emplace_or_replace<Message::Components::Transfer::TagPaused>();
updateMessages(ce);
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
const auto& info = ce.get<Components::FT1InfoSHA1>();
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
updateMessages(o);
} else if (transfer.isChunk()) {
auto o = transfer.getChunk().content;
const auto& info = o.get<Components::FT1InfoSHA1>();
auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
// HACK: only check first chunk (they *should* all be the same)
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.front();
const auto chunk_index = transfer.getChunk().chunk_indices.front();
const uint64_t offset_into_file = chunk_index * uint64_t(info.chunk_size);
assert(chunk_index < info.chunks.size());
const auto chunk_size = info.chunkSize(chunk_index);
assert(offset_into_file+chunk_size <= info.file_size);
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file);
const auto chunk_data = o.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file);
assert(!chunk_data.empty());
// check hash of chunk
@ -971,34 +916,70 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
if (!cc.have_all) {
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) {
cc.have_chunk.at(inner_chunk_index) = true;
for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) {
if (!cc.have_all && !cc.have_chunk[inner_chunk_index]) {
cc.have_chunk.set(inner_chunk_index);
cc.have_count += 1;
if (cc.have_count == info.chunks.size()) {
// debug check
for ([[maybe_unused]] const bool it : cc.have_chunk) {
assert(it);
for ([[maybe_unused]] size_t i = 0; i < info.chunks.size(); i++) {
assert(cc.have_chunk[i]);
}
cc.have_all = true;
cc.have_chunk.clear(); // not wasting memory
cc.have_chunk = BitSet(0); // not wasting memory
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
// HACK: remap file, to clear ram
// TODO: error checking
ce.get<Message::Components::Transfer::File>() = std::make_unique<File2RWMapped>(
ce.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
o.get<Message::Components::Transfer::File>() = std::make_unique<File2RWMapped>(
o.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(),
info.file_size
);
}
// good chunk
// TODO: have wasted + metadata
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
o.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
}
}
// queue chunk have for all participants
// HACK: send immediatly to all participants
for (const auto c_part : o.get<Components::SuspectedParticipants>().participants) {
if (!_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c_part)) {
continue;
}
const auto [part_group_number, part_peer_number] = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c_part);
const auto& info_hash = o.get<Components::FT1InfoSHA1Hash>().hash;
// convert size_t to uint32_t
const std::vector<uint32_t> chunk_indices {
transfer.getChunk().chunk_indices.cbegin(),
transfer.getChunk().chunk_indices.cend()
};
_neep.send_ft1_have(
part_group_number, part_peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO),
info_hash.data(), info_hash.size(),
chunk_indices.data(), chunk_indices.size()
);
}
if (!cc.have_all) { // debug print self have set
std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits();
for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) {
if (i % 16 == 0) {
std::cout << "\n";
}
std::cout << std::hex << (uint16_t)cc.have_chunk.data()[i] << " ";
}
std::cout << std::dec << "\n";
}
} else {
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
}
@ -1009,14 +990,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
// remove from requested
// TODO: remove at init and track running transfers differently
for (const auto it : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
for (const auto it : transfer.getChunk().chunk_indices) {
o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
}
updateMessages(ce); // mostly for received bytes
updateMessages(o); // mostly for received bytes
}
peer_transfers.erase(e.transfer_id);
_receiving_transfers.removePeerTransfer(combineIds(e.group_number, e.peer_number), e.transfer_id);
return true;
}
@ -1058,6 +1039,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
Message3Registry& reg = *reg_ptr;
// TODO: check for existence, hs or other syncing mechanics might have sent it already (or like, it arrived 2x or whatever)
// TODO: use the message dup test provided via rmm
auto new_msg_e = reg.create();
{ // contact
@ -1125,7 +1107,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
ce.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
reg_ptr->emplace<Message::Components::Content>(new_msg_e, ce);
ce.get_or_emplace<Components::SuspectedParticipants>().participants.emplace(c);
// HACK: assume the message sender is participating. usually a safe bet.
addParticipation(c, ce);
if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) {
// TODO: check if already receiving
@ -1426,3 +1409,219 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
return true;
}
bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
const auto group_number = tox_event_group_peer_exit_get_group_number(e);
const auto peer_number = tox_event_group_peer_exit_get_peer_id(e);
// peer disconnected
// - remove from all participantions
auto c = _tcm.getContactGroupPeer(group_number, peer_number);
if (!static_cast<bool>(c)) {
return false;
}
for (const auto& [_, o] : _info_to_content) {
removeParticipation(c, o);
if (o.all_of<Components::RemoteHave>()) {
o.get<Components::RemoteHave>().others.erase(c);
}
}
// - clear queues
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
if (group_number == std::get<0>(*it) && peer_number == std::get<1>(*it)) {
it = _queue_requested_chunk.erase(it);
} else {
it++;
}
}
// TODO: nfcft1 should have fired receive/send done events for all them running transfers
return false;
}
bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
std::cerr << "SHA1_NGCFT1: FT1_HAVE s:" << e.chunks.size() << "\n";
if (e.file_kind != static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO)) {
return false;
}
SHA1Digest info_hash{e.file_id};
auto itc_it = _info_to_content.find(info_hash);
if (itc_it == _info_to_content.end()) {
// we are not interested and dont track this
return false;
}
auto o = itc_it->second;
if (!static_cast<bool>(o)) {
std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n";
return false;
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
// we might not know yet
addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others;
if (!remote_have.contains(c)) {
// init
remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks});
}
auto& remote_have_peer = remote_have.at(c);
if (!remote_have_peer.have_all) {
assert(remote_have_peer.have.size_bits() >= num_total_chunks);
for (const auto c_i : e.chunks) {
if (c_i >= num_total_chunks) {
std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n";
continue;
}
assert(c_i < num_total_chunks);
remote_have_peer.have.set(c_i);
}
// check for completion?
// TODO: optimize
bool test_all {true};
for (size_t i = 0; i < remote_have_peer.have.size_bits(); i++) {
if (!remote_have_peer.have[i]) {
test_all = false;
break;
}
}
if (test_all) {
// optimize
remote_have_peer.have_all = true;
remote_have_peer.have = BitSet{};
}
}
return true;
}
bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
std::cerr << "SHA1_NGCFT1: FT1_BITSET o:" << e.start_chunk << " s:" << e.chunk_bitset.size() << "\n";
if (e.file_kind != static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO)) {
return false;
}
if (e.chunk_bitset.empty()) {
// what
return false;
}
SHA1Digest info_hash{e.file_id};
auto itc_it = _info_to_content.find(info_hash);
if (itc_it == _info_to_content.end()) {
// we are not interested and dont track this
return false;
}
auto o = itc_it->second;
if (!static_cast<bool>(o)) {
std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n";
return false;
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
// +1 for byte rounding
if (num_total_chunks+1 < e.start_chunk + (e.chunk_bitset.size()*8)) {
std::cerr << "SHA1_NGCFT1 error: got bitset.size+start that is larger then number of chunks!!\n";
return false;
}
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
// we might not know yet
addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others;
if (!remote_have.contains(c)) {
// init
remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks});
}
auto& remote_have_peer = remote_have.at(c);
if (!remote_have_peer.have_all) { // TODO: maybe unset with bitset?
BitSet event_bitset{e.chunk_bitset};
remote_have_peer.have.merge(event_bitset, e.start_chunk);
// check for completion?
// TODO: optimize
bool test_all {true};
for (size_t i = 0; i < remote_have_peer.have.size_bits(); i++) {
if (!remote_have_peer.have[i]) {
test_all = false;
break;
}
}
if (test_all) {
// optimize
remote_have_peer.have_all = true;
remote_have_peer.have = BitSet{};
}
}
return true;
}
bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) {
std::cerr << "SHA1_NGCFT1: PC1_ANNOUNCE s:" << e.id.size() << "\n";
// id is file_kind + id
uint32_t file_kind = 0u;
static_assert(SHA1Digest{}.size() == 20);
if (e.id.size() != sizeof(file_kind) + 20) {
// not for us
return false;
}
for (size_t i = 0; i < sizeof(file_kind); i++) {
file_kind |= uint32_t(e.id[i]) << (i*8);
}
if (file_kind != static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO)) {
return false;
}
SHA1Digest hash{e.id.data()+sizeof(file_kind), 20};
// if have use hash(-info) for file, add to participants
std::cout << "SHA1_NGCFT1: got ParticipationChatter1 announce from " << e.group_number << ":" << e.peer_number << " for " << hash << "\n";
auto itc_it = _info_to_content.find(hash);
if (itc_it == _info_to_content.end()) {
// we are not interested and dont track this
return false;
}
// add them to participants
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
auto o = itc_it->second;
const bool was_new = addParticipation(c, o);
if (was_new) {
std::cout << "SHA1_NGCFT1: and we where interested!\n";
// we should probably send the bitset back here / add to queue (can be multiple packets)
}
return false;
}

View File

@ -10,6 +10,7 @@
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include "./ft1_sha1_info.hpp"
#include "./receiving_transfers.hpp"
#include <entt/entity/registry.hpp>
#include <entt/entity/handle.hpp>
@ -21,13 +22,15 @@
#include <mutex>
#include <list>
class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public NGCFT1EventI, public NGCEXTEventI {
ObjectStore2& _os;
// TODO: backend abstraction
Contact3Registry& _cr;
RegistryMessageModel& _rmm;
NGCFT1& _nft;
ToxContactModel2& _tcm;
ToxEventProviderI& _tep;
NGCEXTEventProvider& _neep;
std::minstd_rand _rng {1337*11};
@ -66,27 +69,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
// key is groupid + peerid
entt::dense_map<uint64_t, entt::dense_map<uint8_t, SendingTransfer>> _sending_transfers;
struct ReceivingTransfer {
struct Info {
ObjectHandle content;
// copy of info data
// too large?
std::vector<uint8_t> info_data;
};
struct Chunk {
ObjectHandle content;
std::vector<size_t> chunk_indices;
// or data?
// if memmapped, this would be just a pointer
};
std::variant<Info, Chunk> v;
float time_since_activity {0.f};
};
// key is groupid + peerid
entt::dense_map<uint64_t, entt::dense_map<uint8_t, ReceivingTransfer>> _receiving_transfers;
ReceivingTransfers _receiving_transfers;
// makes request rotate around open content
std::deque<ObjectHandle> _queue_content_want_info;
@ -122,7 +105,9 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
Contact3Registry& cr,
RegistryMessageModel& rmm,
NGCFT1& nft,
ToxContactModel2& tcm
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCEXTEventProvider& neep
);
void iterate(float delta);
@ -140,5 +125,12 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
bool onEvent(const Events::NGCFT1_recv_message&) override;
bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
bool onEvent(const Events::NGCEXT_ft1_have&) override;
bool onEvent(const Events::NGCEXT_ft1_bitset&) override;
bool onEvent(const Events::NGCEXT_pc1_announce&) override;
};