16 Commits

Author SHA1 Message Date
70620a901b starting the work on hs2 rizzler 2024-12-07 23:34:19 +01:00
231928339e sigma 2024-12-07 11:38:31 +01:00
294c5346ca hs2 send done, but untested 2024-12-06 22:41:05 +01:00
adeaca4efe early out have when we have all 2024-12-06 16:36:02 +01:00
ba809eda43 refactor and test ts start search 2024-12-06 14:11:06 +01:00
b9a7c75d20 hs: progess on the log file generator, fully written but untested
ft: timeout changes and wording fixes
2024-12-05 23:06:44 +01:00
04b6f7925a use cache for group peer to contact lookup (~5% cpu) 2024-12-02 14:27:39 +01:00
5601ad91f5 a bunch of allocation optimizations 2024-12-02 13:08:47 +01:00
741f1428d3 updated spec to 2.1, changed almost everything
other small fixes
2024-11-28 12:59:14 +01:00
34dc01d4dc fix chunk picker round robin actually working
(worked kinda bc of bug before)
make priority dynamic and fix skipping to make it work too
2024-11-22 13:51:48 +01:00
1bf1fbce75 small hs progress 2024-11-22 13:51:25 +01:00
37b92f67c8 respect voice state for receiving file messages
we should also check on send in the future
2024-11-06 10:50:42 +01:00
01c892df8c tweak finishing timer 2024-11-06 10:49:24 +01:00
6eb5826616 split recv and send, they dont share any code (probably) 2024-11-03 18:21:02 +01:00
2e6b15e4ad more hs drafting 2024-11-01 11:31:05 +01:00
63de78aaeb add spec draft to repo 2024-10-31 15:46:48 +01:00
23 changed files with 1059 additions and 231 deletions

View File

@ -43,21 +43,6 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC
########################################
add_library(solanaceae_ngchs2
./solanaceae/ngc_hs2/ngc_hs2.hpp
./solanaceae/ngc_hs2/ngc_hs2.cpp
)
target_include_directories(solanaceae_ngchs2 PUBLIC .)
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
target_link_libraries(solanaceae_ngchs2 PUBLIC
solanaceae_ngcft1
solanaceae_tox_contacts
solanaceae_message3
solanaceae_object_store
)
########################################
add_library(solanaceae_sha1_ngcft1
# hacky deps
./solanaceae/ngc_ft1_sha1/mio.hpp
@ -117,8 +102,6 @@ target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
solanaceae_file2
)
########################################
option(SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING "Build the solanaceae_ngcft1_sha1 tests" OFF)
message("II SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING " ${SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING})
@ -136,3 +119,43 @@ if (SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING)
endif()
########################################
add_library(solanaceae_ngchs2
./solanaceae/ngc_hs2/ts_find_start.hpp
./solanaceae/ngc_hs2/ngc_hs2_sigma.hpp
./solanaceae/ngc_hs2/ngc_hs2_sigma.cpp
./solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp
./solanaceae/ngc_hs2/ngc_hs2_rizzler.cpp
)
target_include_directories(solanaceae_ngchs2 PUBLIC .)
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
target_link_libraries(solanaceae_ngchs2 PUBLIC
solanaceae_ngcft1
solanaceae_sha1_ngcft1 # HACK: properly abstract filekind/-id
solanaceae_tox_contacts
solanaceae_message3
solanaceae_object_store
nlohmann_json::nlohmann_json
)
option(SOLANACEAE_NGCHS2_BUILD_TESTING "Build the solanaceae_ngchs2 tests" OFF)
message("II SOLANACEAE_NGCHS2_BUILD_TESTING " ${SOLANACEAE_NGCHS2_BUILD_TESTING})
if (SOLANACEAE_NGCHS2_BUILD_TESTING)
include(CTest)
add_executable(test_hs2_ts_binarysearch
./solanaceae/ngc_hs2/test_ts_binarysearch.cpp
)
target_link_libraries(test_hs2_ts_binarysearch PUBLIC
solanaceae_ngchs2
)
add_test(NAME test_hs2_ts_binarysearch COMMAND test_hs2_ts_binarysearch)
endif()

View File

@ -2,3 +2,13 @@ cmake_minimum_required(VERSION 3.9 FATAL_ERROR)
add_subdirectory(./sha1)
# we are running a custom msgpack serialization for hs2
if (NOT TARGET nlohmann_json::nlohmann_json)
FetchContent_Declare(json
URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz
URL_HASH SHA256=d6c65aca6b1ed68e7a182f4757257b107ae403032760ed6ef121c9d55e81757d
EXCLUDE_FROM_ALL
)
FetchContent_MakeAvailable(json)
endif()

View File

@ -255,6 +255,7 @@ bool NGCEXTEventProvider::parse_ft1_data_ack(
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
e.transfer_id = data[curser++];
e.sequence_ids.reserve(std::max<int64_t>(data_size-curser, 1)/sizeof(uint16_t));
while (curser < data_size) {
_DATA_HAVE(sizeof(uint16_t), std::cerr << "NGCEXT: packet too small, missing seq_id\n"; return false)
uint16_t seq_id = data[curser++];

View File

@ -61,10 +61,6 @@ struct CCAI {
// returns -1 if not implemented, can return 0
virtual int64_t inFlightBytes(void) const { return -1; }
// returns -1 if not implemented, can return 0
// excluded timed out packets (not those currently resent)
virtual int64_t inFlightBytesAccounted(void) const { return -1; }
public: // callbacks
// data size is without overhead
virtual void onSent(SeqIDType seq, size_t data_size) = 0;

View File

@ -93,8 +93,7 @@ int64_t CUBIC::canSend(float time_delta) {
}
const auto window = getCWnD();
//int64_t cspace_bytes = window - _in_flight_bytes;
int64_t cspace_bytes = window - _in_flight_bytes_accounted;
int64_t cspace_bytes = window - _in_flight_bytes;
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
//std::cerr << "CUBIC: cspace < seg size\n";
return 0u;

View File

@ -29,25 +29,6 @@ void FlowOnly::updateWindow(void) {
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
}
void FlowOnly::updateAccounted(void) {
int64_t size_timedout {0};
{ // can be expensive
// code see getTimeouts()
// after 3 rtt delay, we trigger timeout
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
for (const auto& [seq, time_stamp, size, _] : _in_flight) {
if (now_adjusted > time_stamp) {
//list.push_back(seq);
size_timedout += size;
}
}
}
_in_flight_bytes_accounted = _in_flight_bytes - size_timedout;
}
void FlowOnly::updateCongestion(void) {
updateWindow();
const auto tmp_window = getWindow();
@ -89,10 +70,8 @@ int64_t FlowOnly::canSend(float time_delta) {
}
updateWindow();
updateAccounted();
//int64_t fspace = _fwnd - _in_flight_bytes;
int64_t fspace = _fwnd - _in_flight_bytes_accounted;
int64_t fspace = _fwnd - _in_flight_bytes;
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
return 0u;
}
@ -107,6 +86,7 @@ int64_t FlowOnly::canSend(float time_delta) {
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
std::vector<SeqIDType> list;
list.reserve(_in_flight.size()/3); // we dont know, so we just guess
// after 3 rtt delay, we trigger timeout
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
@ -128,10 +108,6 @@ int64_t FlowOnly::inFlightBytes(void) const {
return _in_flight_bytes;
}
int64_t FlowOnly::inFlightBytesAccounted(void) const {
return _in_flight_bytes_accounted;
}
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
if constexpr (true) {
size_t sum {0u};

View File

@ -12,7 +12,7 @@ struct FlowOnly : public CCAI {
public: // config
static constexpr float RTT_EMA_ALPHA = 0.001f; // might need change over time
static constexpr float RTT_UP_MAX = 3.0f; // how much larger a delay can be to be taken into account
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
static constexpr float RTT_MAX = 2.f; // maybe larger for tunneled connections
protected:
// initialize to low value, will get corrected very fast
@ -32,7 +32,6 @@ struct FlowOnly : public CCAI {
};
std::vector<FlyingBunch> _in_flight;
int64_t _in_flight_bytes {0};
int64_t _in_flight_bytes_accounted {0};
int32_t _consecutive_events {0};
@ -59,8 +58,6 @@ struct FlowOnly : public CCAI {
void updateWindow(void);
void updateAccounted(void);
virtual void onCongestion(void) {};
// internal logic, calls the onCongestion() event
@ -80,7 +77,6 @@ struct FlowOnly : public CCAI {
int64_t inFlightCount(void) const override;
int64_t inFlightBytes(void) const override;
int64_t inFlightBytesAccounted(void) const override;
public: // callbacks
// data size is without overhead

View File

@ -56,7 +56,9 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
} else {
#if 0 // too spammy
std::cerr << "NGCFT1 warning: no space to resend timedout\n";
#endif
}
}
});
@ -625,7 +627,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
// TODO: keep around for remote timeout + delay + offset, so we can be sure all acks where received
// or implement a dedicated finished that needs to be acked
transfer.finishing_timer = 0.75f; // TODO: we are receiving, we dont know delay
//transfer.finishing_timer = 0.75f; // TODO: we are receiving, we dont know delay
transfer.finishing_timer = FlowOnly::RTT_MAX;
dispatch(
NGCFT1_Event::recv_done,
@ -669,6 +672,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
{
std::vector<CCAI::SeqIDType> seqs;
seqs.reserve(e.sequence_ids.size());
for (const auto it : e.sequence_ids) {
// TODO: improve this o.o
seqs.push_back({e.transfer_id, it});

View File

@ -16,7 +16,7 @@ enum class NGCFT1_file_kind : uint32_t {
// id: TOX_FILE_ID_LENGTH (32) bytes
// this is basically and id and probably not a hash, like the tox friend api
// this id can be unique between 2 peers
ID = 8u,
ID = 8u, // TODO: this is actually DATA and 0
// id: hash of the info, like a torrent infohash (using the same hash as the data)
// TODO: determain internal format
@ -75,20 +75,7 @@ enum class NGCFT1_file_kind : uint32_t {
// https://gist.github.com/Green-Sky/440cd9817a7114786850eb4c62dc57c3
// id: ts start, ts end
// content:
// - ts start (do we need this? when this is part of the id?)
// - ts end (same)
// - list size
// - ppk
// - mid
// - ts
HS2_INFO_RANGE_TIME = 0x00000f00,
// TODO: half open ranges
// TODO: id based
// TODO: ppk based?
// id: ppk, mid, ts
HS2_SINGLE_MESSAGE,
// TODO: message pack
HS2_RANGE_TIME = 0x00000f00, // TODO: remove, did not survive
HS2_RANGE_TIME_MSGPACK = 0x00000f02,
};

View File

@ -133,6 +133,7 @@ void ChunkPicker::updateParticipation(
entt::dense_set<Object> checked;
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
const ObjectHandle o {objreg, ov};
if (participating_unfinished.contains(o)) {
@ -150,6 +151,21 @@ void ChunkPicker::updateParticipation(
participating_unfinished.erase(o);
continue;
}
// TODO: optimize this to only change on dirty, or something
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
Priority prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
uint16_t pskips =
prio == Priority::HIGHEST ? 0u :
prio == Priority::HIGH ? 1u :
prio == Priority::NORMAL ? 2u :
prio == Priority::LOW ? 4u :
8u // LOWEST
;
participating_unfinished.at(o).should_skip = pskips;
}
} else {
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
continue;
@ -160,8 +176,6 @@ void ChunkPicker::updateParticipation(
}
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
//using Priority = Components::DownloadPriority::Priority;
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
Priority prio = Priority::NORMAL;
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
@ -239,13 +253,12 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// round robin content (remember last obj)
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
participating_in_last = participating_unfinished.begin()->first;
//participating_in_last = *participating_unfinished.begin();
}
assert(objreg.valid(participating_in_last));
auto it = participating_unfinished.find(participating_in_last);
// hard limit robin rounds to array size time 20
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++) {
// hard limit robin rounds to array size times 20
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) {
if (it == participating_unfinished.end()) {
it = participating_unfinished.begin();
}
@ -254,6 +267,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
it->second.skips++;
continue;
}
it->second.skips = 0;
ObjectHandle o {objreg, it->first};
@ -361,7 +375,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
}
}
if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
if (it == participating_unfinished.end()) {
participating_in_last = entt::null;
} else {
participating_in_last = it->first;

View File

@ -63,6 +63,7 @@ size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
std::vector<uint8_t> FT1InfoSHA1::toBuffer(void) const {
std::vector<uint8_t> buffer;
buffer.reserve(256+8+4+20*chunks.size());
assert(!file_name.empty());
// TODO: optimize

View File

@ -7,8 +7,8 @@ void ReceivingTransfers::tick(float delta) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 20sec, timeout
if (it->second.time_since_activity >= 20.f) {
// if we have not heard for 60sec, timeout
if (it->second.time_since_activity >= 60.f) {
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
//it->second.v

View File

@ -250,7 +250,7 @@ float SHA1_NGCFT1::iterate(float delta) {
//std::cerr << "---------- new tick ----------\n";
_mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
entt::dense_map<Contact3, size_t> peer_open_requests;
_peer_open_requests.clear();
{ // timers
// sending transfers
@ -299,7 +299,7 @@ float SHA1_NGCFT1::iterate(float delta) {
}
}
{ // requested chunk timers
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta, &peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([this, delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
it->second.timer += delta;
@ -307,7 +307,7 @@ float SHA1_NGCFT1::iterate(float delta) {
if (it->second.timer >= 60.f) {
it = ftchunk_requested.chunks.erase(it);
} else {
peer_open_requests[it->second.c] += 1;
_peer_open_requests[it->second.c] += 1;
it++;
}
}
@ -447,7 +447,7 @@ float SHA1_NGCFT1::iterate(float delta) {
Systems::chunk_picker_updates(
_cr,
_os.registry(),
peer_open_requests,
_peer_open_requests,
_receiving_transfers,
_nft,
delta
@ -456,7 +456,7 @@ float SHA1_NGCFT1::iterate(float delta) {
// transfer statistics systems
Systems::transfer_tally_update(_os.registry(), getTimeNow());
if (peer_open_requests.empty()) {
if (_peer_open_requests.empty()) {
return 2.f;
} else {
// pretty conservative and should be ajusted on a per peer, per delay basis
@ -717,7 +717,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
}
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
if (e.file_id_size != 20) {
// error
@ -735,7 +735,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
{ // they advertise interest in the content
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
@ -796,7 +796,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
e.accept = true;
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size};
@ -809,7 +809,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
{ // they have the content (probably, might be fake, should move this to done)
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
@ -868,7 +868,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
std::cerr << "SHA1_NGCFT1 waring: unknown transfer " << e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
std::cerr << "SHA1_NGCFT1 warning: unknown transfer " << (int)e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
return false;
}
@ -955,7 +955,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
// TODO: add event to propergate to messages
//_rmm.throwEventUpdate(transfer); // should we?
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
Contact3Handle c;
const auto tpcc_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
if (tpcc_it != _tox_peer_to_contact.cend()) {
c = tpcc_it->second;
} else {
c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (static_cast<bool>(c)) {
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c;
}
}
if (static_cast<bool>(c)) {
chunk_transfer.content.get_or_emplace<Components::TransferStatsTally>()
.tally[c]
@ -1165,6 +1174,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
return false;
}
// TODO: make perms go though contacts
// TODO: consider temporal component? not here, here is now
if (!_tcm.groupPeerCanSpeak(e.group_number, e.peer_number)) {
// peer has not the permission to speak, discard
return false; // return true?
}
uint64_t ts = Message::getTimeMS();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
@ -1383,11 +1399,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
return false;
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
// we might not know yet
if (addParticipation(c, o)) {
@ -1395,6 +1409,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
//c.emplace_or_replace<ChunkPickerUpdateTag>();
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) {
// init
@ -1404,6 +1420,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
//c.emplace_or_replace<ChunkPickerUpdateTag>();
}
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
return true; // we dont care beyond this point
}
auto& remote_have_peer = remote_have.at(c);
if (remote_have_peer.have_all) {
return true; // peer somehow already had all, ignoring
@ -1485,7 +1505,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
// we might not know yet
addParticipation(c, o);
@ -1548,7 +1568,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) {
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
// we might not know yet
addParticipation(c, o);
@ -1595,7 +1615,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) {
// add to participants
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
auto o = itc_it->second;
if (addParticipation(c, o)) {
// something happend, update chunk picker

View File

@ -74,6 +74,9 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
// only used to remove participation on peer exit
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
// reset every iterate; kept here as an allocation optimization
entt::dense_map<Contact3, size_t> _peer_open_requests;
void updateMessages(ObjectHandle ce);
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);

View File

@ -1,91 +0,0 @@
#include "./ngc_hs2.hpp"
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
NGCHS2::NGCHS2(
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCFT1& nft
) :
_tcm(tcm),
_tep_sr(tep.newSubRef(this)),
_nft(nft),
_nftep_sr(_nft.newSubRef(this))
{
_tep_sr
.subscribe(TOX_EVENT_GROUP_PEER_JOIN)
.subscribe(TOX_EVENT_GROUP_PEER_EXIT)
;
_nftep_sr
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_request)
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_data)
.subscribe(NGCFT1_Event::send_data)
.subscribe(NGCFT1_Event::recv_done)
.subscribe(NGCFT1_Event::send_done)
;
}
NGCHS2::~NGCHS2(void) {
}
float NGCHS2::iterate(float delta) {
return 1000.f;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_request& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
) {
return false; // not for us
}
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_init& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
) {
return false; // not for us
}
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_data&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_send_data&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_done&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_send_done&) {
return false;
}
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
const auto group_number = tox_event_group_peer_join_get_group_number(e);
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
assert(c);
// add to check list with inital cooldown
return false;
}
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
return false;
}

View File

@ -1,44 +0,0 @@
#pragma once
//#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/toxcore/tox_event_interface.hpp>
//#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
// fwd
class ToxContactModel2;
class NGCHS2 : public ToxEventI, public NGCFT1EventI {
ToxContactModel2& _tcm;
//Contact3Registry& _cr;
//RegistryMessageModelI& _rmm;
ToxEventProviderI::SubscriptionReference _tep_sr;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
public:
NGCHS2(
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCFT1& nf
);
~NGCHS2(void);
float iterate(float delta);
protected:
bool onEvent(const Events::NGCFT1_recv_request&) override;
bool onEvent(const Events::NGCFT1_recv_init&) override;
bool onEvent(const Events::NGCFT1_recv_data&) override;
bool onEvent(const Events::NGCFT1_send_data&) override;
bool onEvent(const Events::NGCFT1_recv_done&) override;
bool onEvent(const Events::NGCFT1_send_done&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
};

View File

@ -0,0 +1,123 @@
#include "./ngc_hs2_rizzler.hpp"
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <iostream>
NGCHS2Rizzler::NGCHS2Rizzler(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft
) :
_cr(cr),
_rmm(rmm),
_tcm(tcm),
_nft(nft),
_nftep_sr(_nft.newSubRef(this))
{
_nftep_sr
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_data)
.subscribe(NGCFT1_Event::recv_done)
;
}
NGCHS2Rizzler::~NGCHS2Rizzler(void) {
}
float NGCHS2Rizzler::iterate(float delta) {
for (auto it = _request_queue.begin(); it != _request_queue.end();) {
it->second.timer += delta;
if (it->second.timer < it->second.delay) {
it++;
continue;
}
if (!_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(it->first)) {
// peer nolonger online
it = _request_queue.erase(it);
continue;
}
const auto [group_number, peer_number] = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(it->first);
// now in sec
const uint64_t ts_now = Message::getTimeMS()/1000;
if (sendRequest(group_number, peer_number, ts_now, ts_now-(60*48))) {
// TODO: requeue
// TODO: segment
// TODO: dont request already received ranges
//// on success, requeue with longer delay (minutes)
//it->second.timer = 0.f;
//it->second.delay = _delay_next_request_min + _rng_dist(_rng)*_delay_next_request_add;
//// double the delay for overlap (9m-15m)
//// TODO: finetune
//it->second.sync_delta = uint8_t((it->second.delay/60.f)*2.f) + 1;
//std::cout << "ZOX #### requeued request in " << it->second.delay << "s\n";
it++;
} else {
// on failure, assume disconnected
it = _request_queue.erase(it);
}
// just choose something small, since we expect a response might arrive soon
//min_interval = std::min(min_interval, _delay_between_syncs_min);
}
return 1000.f;
}
bool NGCHS2Rizzler::sendRequest(
uint32_t group_number, uint32_t peer_number,
uint64_t ts_start, uint64_t ts_end
) {
std::cout << "NGCHS2Rizzler: sending request to " << group_number << ":" << peer_number << " (" << ts_start << "," << ts_end << ")\n";
return false;
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_init&) {
return false;
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_data&) {
return false;
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_done&) {
return false;
}
bool NGCHS2Rizzler::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
const auto group_number = tox_event_group_peer_join_get_group_number(e);
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
if (!c) {
return false;
}
if (!_request_queue.count(c)) {
_request_queue[c] = {
_delay_before_first_request_min + _rng_dist(_rng)*_delay_before_first_request_add,
0.f,
0,
};
}
return false;
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
// fwd
class ToxContactModel2;
class RegistryMessageModelI;
class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
Contact3Registry& _cr;
RegistryMessageModelI& _rmm;
ToxContactModel2& _tcm;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
// 5s-6s
const float _delay_before_first_request_min {5.f};
const float _delay_before_first_request_add {1.f};
std::uniform_real_distribution<float> _rng_dist {0.0f, 1.0f};
std::minstd_rand _rng;
struct RequestQueueInfo {
float delay; // const
float timer;
uint64_t sync_delta; //?
};
// request queue
// c -> delay, timer
std::map<Contact3, RequestQueueInfo> _request_queue;
public:
NGCHS2Rizzler(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nf
);
~NGCHS2Rizzler(void);
float iterate(float delta);
protected:
bool sendRequest(
uint32_t group_number, uint32_t peer_number,
uint64_t ts_start, uint64_t ts_end
);
protected:
bool onEvent(const Events::NGCFT1_recv_init&) override;
bool onEvent(const Events::NGCFT1_recv_data&) override;
bool onEvent(const Events::NGCFT1_recv_done&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
};

View File

@ -0,0 +1,423 @@
#include "./ngc_hs2_sigma.hpp"
#include <solanaceae/util/span.hpp>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
#include <solanaceae/contact/components.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/message3/components.hpp>
#include <solanaceae/tox_messages/msg_components.hpp>
//#include <solanaceae/tox_messages/obj_components.hpp>
// TODO: this is kinda bad, needs improvement
// use tox fileid/filekind instead !
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
#include <solanaceae/ngc_ft1_sha1/components.hpp>
#include <nlohmann/json.hpp>
#include "./ts_find_start.hpp"
#include <iostream>
// https://www.youtube.com/watch?v=AdAqsgga3qo
namespace Components {
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) {
// TODO: do more than exact dedupe
for (const auto& [time_range, _] : _queue) {
if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) {
return; // already enqueued
}
}
_queue.emplace_back(Entry{new_request, std::vector<uint8_t>{fid.cbegin(), fid.cend()}});
}
} // Components
NGCHS2Sigma::NGCHS2Sigma(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft
) :
_cr(cr),
_rmm(rmm),
_tcm(tcm),
_nft(nft),
_nftep_sr(_nft.newSubRef(this))
{
_nftep_sr
.subscribe(NGCFT1_Event::recv_request)
.subscribe(NGCFT1_Event::send_data)
.subscribe(NGCFT1_Event::send_done)
;
}
NGCHS2Sigma::~NGCHS2Sigma(void) {
}
float NGCHS2Sigma::iterate(float delta) {
// limit how often we update here (new fts usually)
if (_iterate_heat > 0.f) {
_iterate_heat -= delta;
return 1000.f; // return heat?
} else {
_iterate_heat = _iterate_cooldown;
}
// work request queue
// check if already running, discard
auto fn_iirq = [this](auto&& view) {
for (auto&& [cv, iirq] : view.each()) {
Contact3Handle c{_cr, cv};
auto& iirr = c.get_or_emplace<Components::IncommingTimeRangeRequestRunning>();
// dedup queued from running
if (iirr._list.size() >= _max_parallel_per_peer) {
continue;
}
// new ft here
// TODO: loop? nah just 1 per tick is enough
const auto request_entry = iirq._queue.front(); // copy
assert(!request_entry.fid.empty());
if (!c.all_of<Contact::Components::Parent>()) {
iirq._queue.pop_front();
continue; // how
}
const Contact3Handle group_c = {*c.registry(), c.get<Contact::Components::Parent>().parent};
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
iirq._queue.pop_front();
continue;
}
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
// TODO: check allowed range here
//_max_time_into_past_default
// potentially heavy op
auto data = buildChatLogFileRange(group_c, request_entry.ir.ts_start, request_entry.ir.ts_end);
uint8_t transfer_id {0};
if (!_nft.NGC_FT1_send_init_private(
group_number, peer_number,
(uint32_t)NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK,
request_entry.fid.data(), request_entry.fid.size(),
data.size(),
&transfer_id,
true // can_compress (does nothing rn)
)) {
// sending failed, we do not pop but wait for next iterate
// TODO: cache data
// TODO: fail counter
// actually, fail probably means offline, so delete?
continue;
}
assert(iirr._list.count(transfer_id) == 0);
iirr._list[transfer_id] = {request_entry.ir, data};
iirq._queue.pop_front();
}
};
// first handle range requests on weak self
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue, Contact::Components::TagSelfWeak>());
// we could stop here, if too much is already running
// then range on others
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
_cr.view<Components::IncommingTimeRangeRequestRunning>().each(
[delta](const auto cv, Components::IncommingTimeRangeRequestRunning& irr) {
std::vector<uint8_t> to_remove;
for (auto&& [ft_id, entry] : irr._list) {
entry.last_activity += delta;
if (entry.last_activity >= 60.f) {
to_remove.push_back(ft_id);
}
}
for (const auto it : to_remove) {
std::cout << "NGCHS2Sigma warning: timed out ." << (int)it << "\n";
// TODO: need a way to tell ft?
irr._list.erase(it);
// technically we are not supposed to timeout and instead rely on the done event
}
}
);
return 1000.f;
}
template<typename Type>
static uint64_t deserlSimpleType(ByteSpan bytes) {
if (bytes.size < sizeof(Type)) {
throw int(1);
}
Type value;
for (size_t i = 0; i < sizeof(Type); i++) {
value |= Type(bytes[i]) << (i*8);
}
return value;
}
//static uint32_t deserlMID(ByteSpan mid_bytes) {
// return deserlSimpleType<uint32_t>(mid_bytes);
//}
static uint64_t deserlTS(ByteSpan ts_bytes) {
return deserlSimpleType<uint64_t>(ts_bytes);
}
void NGCHS2Sigma::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
std::cerr << "NGCHS2S error: range not lange enough\n";
return;
}
// seconds
uint64_t ts_start{0};
uint64_t ts_end{0};
// parse
try {
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
ts_start = deserlTS(ts_start_bytes);
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
ts_end = deserlTS(ts_end_bytes);
} catch (...) {
std::cerr << "NGCHS2S error: failed to parse range\n";
return;
}
// dedupe insert into queue
// how much overlap do we allow?
c.get_or_emplace<Components::IncommingTimeRangeRequestQueue>().queueRequest(
{ts_start, ts_end},
fid
);
}
std::vector<uint8_t> NGCHS2Sigma::buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) {
const Message3Registry* reg_ptr = static_cast<const RegistryMessageModelI&>(_rmm).get(c);
if (reg_ptr == nullptr) {
return {};
}
const Message3Registry& msg_reg = *reg_ptr;
if (msg_reg.storage<Message::Components::Timestamp>() == nullptr) {
// nothing to do here
return {};
}
//std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << " ts_end:" << ts_end << "\n";
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
// we iterate "forward", so from newest to oldest
// start is the newest ts
const auto ts_start_it = find_start_by_ts(ts_view, ts_start);
// end is the oldest ts
// we only search for the start point, because we walk to the end anyway
auto j_array = nlohmann::json::array_t{};
// hmm
// maybe use other view or something?
for (auto it = ts_start_it; it != ts_view.end(); it++) {
const auto e = *it;
const auto& [ts_comp] = ts_view.get(e);
if (ts_comp.ts > ts_start) {
std::cerr << "!!!! msg ent in view too new\n";
continue;
} else if (ts_comp.ts < ts_end) {
// too old, we hit the end of the range
break;
}
if (!msg_reg.all_of<
Message::Components::ContactFrom,
Message::Components::ContactTo,
Message::Components::ToxGroupMessageID
>(e)) {
continue; // ??
}
if (!msg_reg.any_of<Message::Components::MessageText, Message::Components::MessageFileObject>(e)) {
continue; // skip
}
const auto& [c_from_c, c_to_c] = msg_reg.get<Message::Components::ContactFrom, Message::Components::ContactTo>(e);
if (c_to_c.c != c) {
// message was not public
continue;
}
if (!_cr.valid(c_from_c.c)) {
continue; // ???
}
Contact3Handle c_from{_cr, c_from_c.c};
if (!c_from.all_of<Contact::Components::ToxGroupPeerPersistent>()) {
continue; // ???
}
if (_only_send_self_observed && msg_reg.all_of<Message::Components::SyncedBy>(e) && c.all_of<Contact::Components::Self>()) {
if (!msg_reg.get<Message::Components::SyncedBy>(e).ts.count(c.get<Contact::Components::Self>().self)) {
continue; // did not observe ourselfs, skip
}
}
auto j_entry = nlohmann::json::object_t{};
j_entry["ts"] = ts_comp.ts/100; // millisec -> decisec
{
const auto& ppk_ref = c_from.get<Contact::Components::ToxGroupPeerPersistent>().peer_key.data;
j_entry["ppk"] = nlohmann::json::binary_t{std::vector<uint8_t>{ppk_ref.cbegin(), ppk_ref.cend()}};
}
j_entry["mid"] = msg_reg.get<Message::Components::ToxGroupMessageID>(e).id;
if (msg_reg.all_of<Message::Components::MessageText>(e)) {
if (msg_reg.all_of<Message::Components::TagMessageIsAction>(e)) {
j_entry["msgtype"] = "action"; // TODO: textaction?
} else {
j_entry["msgtype"] = "text";
}
j_entry["text"] = msg_reg.get<Message::Components::MessageText>(e).text;
} else if (msg_reg.any_of<Message::Components::MessageFileObject>(e)) {
const auto& o = msg_reg.get<Message::Components::MessageFileObject>(e).o;
if (!o) {
continue;
}
// HACK: use tox fild_id and file_kind instead!!
if (o.all_of<Components::FT1InfoSHA1Hash>()) {
j_entry["fkind"] = NGCFT1_file_kind::HASH_SHA1_INFO;
j_entry["fid"] = nlohmann::json::binary_t{o.get<Components::FT1InfoSHA1Hash>().hash};
} else {
continue; // unknown file type
}
}
j_array.push_back(j_entry);
}
return nlohmann::json::to_msgpack(j_array);
}
bool NGCHS2Sigma::onEvent(const Message::Events::MessageConstruct&) {
return false;
}
bool NGCHS2Sigma::onEvent(const Message::Events::MessageUpdated&) {
return false;
}
bool NGCHS2Sigma::onEvent(const Message::Events::MessageDestory&) {
return false;
}
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_recv_request& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK
) {
return false; // not for us
}
// TODO: when is it done from queue?
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false; // how
}
// is other peer allowed to make requests
//bool quick_allow {false};
bool quick_allow {true}; // HACK: disable all restrictions for this early test
// TODO: quick deny?
{
// - tagged as weakself
if (!quick_allow && c.all_of<Contact::Components::TagSelfWeak>()) {
quick_allow = true;
}
// - sub perm level??
// - out of max time range (ft specific, not a quick_allow)
}
if (e.file_kind == NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) {
handleTimeRange(c, e);
}
return true;
}
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_data& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false;
}
if (!c.all_of<Components::IncommingTimeRangeRequestRunning>()) {
return false;
}
auto& irr = c.get<Components::IncommingTimeRangeRequestRunning>();
if (!irr._list.count(e.transfer_id)) {
return false; // not for us (maybe)
}
auto& transfer = irr._list.at(e.transfer_id);
if (transfer.data.size() < e.data_offset+e.data_size) {
std::cerr << "NGCHS2Sigma error: ft send data larger then file???\n";
assert(false && "how");
}
std::memcpy(e.data, transfer.data.data()+e.data_offset, e.data_size);
transfer.last_activity = 0.f;
return true;
}
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_done& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false;
}
if (!c.all_of<Components::IncommingTimeRangeRequestRunning>()) {
return false;
}
auto& irr = c.get<Components::IncommingTimeRangeRequestRunning>();
if (!irr._list.count(e.transfer_id)) {
return false; // not for us (maybe)
}
irr._list.erase(e.transfer_id);
// TODO: check if we completed it
std::cout << "NGCHS2Sigma: sent chatlog to " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
return true;
}

View File

@ -0,0 +1,102 @@
#pragma once
#include <solanaceae/toxcore/tox_event_interface.hpp>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <entt/container/dense_map.hpp>
#include <solanaceae/util/span.hpp>
#include <vector>
#include <deque>
// fwd
class ToxContactModel2;
struct TimeRangeRequest {
uint64_t ts_start{0};
uint64_t ts_end{0};
};
// TODO: move to own file
namespace Components {
struct IncommingTimeRangeRequestQueue {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> fid;
};
std::deque<Entry> _queue;
// we should remove/notadd queued requests
// that are subsets of same or larger ranges
void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid);
};
struct IncommingTimeRangeRequestRunning {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> data; // transfer data in memory
float last_activity {0.f};
};
entt::dense_map<uint8_t, Entry> _list;
};
} // Components
class NGCHS2Sigma : public RegistryMessageModelEventI, public NGCFT1EventI {
Contact3Registry& _cr;
RegistryMessageModelI& _rmm;
ToxContactModel2& _tcm;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
float _iterate_heat {0.f};
constexpr static float _iterate_cooldown {1.22f}; // sec
// open/running range requests (by c)
// comp on peer c
// open/running range responses (by c)
// comp on peer c
// limit to 2 uploads per peer simultaniously
// TODO: increase for prod (4?) or maybe even lower?
// currently per type
constexpr static size_t _max_parallel_per_peer {2};
constexpr static bool _only_send_self_observed {true};
constexpr static int64_t _max_time_into_past_default {60*15}; // s
public:
NGCHS2Sigma(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nf
);
~NGCHS2Sigma(void);
float iterate(float delta);
void handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request&);
// msg reg contact
// time ranges
[[nodiscard]] std::vector<uint8_t> buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end);
protected:
bool onEvent(const Message::Events::MessageConstruct&) override;
bool onEvent(const Message::Events::MessageUpdated&) override;
bool onEvent(const Message::Events::MessageDestory&) override;
protected:
bool onEvent(const Events::NGCFT1_recv_request&) override;
bool onEvent(const Events::NGCFT1_send_data&) override;
bool onEvent(const Events::NGCFT1_send_done&) override;
};

View File

@ -0,0 +1,112 @@
# [NGC] Group-History-Sync (v2.1) [PoC] [Draft]
Simple group history sync that uses `timestamp` + `peer public key` + `message_id` (`ts+ppk+mid`) to, mostly, uniquely identify messages and deliver them.
Messages are bundled up in a `msgpack` `array` and sent as a file transfer.
## Requirements
TODO: more?
### Msgpack
For serializing the messages.
### File transfers
For sending packs of messages.
Even a single message can be larger than a single custom packet, so this is a must-have.
This also allows for compression down the road.
## Procedure
Peer A can request `ts+ppk+mid+msg` list for a given time range from peer B.
Peer B then sends a filetransfer (with special file type) of list of `ts+ppk+mid+msg`.
Optionally compressed. (Delta-coding? / zstd)
Peer A keeps doing that until the desired time span is covered.
During all that, peer B usually does the same thing to peer A.
TODO: deny request explicitly. also why (like perms and time range too large etc)
## Traffic savings
It is recomended to remember if a range has been requested and answered from a given peer, to reduce traffic.
While compression is optional, it is recommended.
Timestamps fit delta coding.
Peer keys fit dicts.
Message ids are mostly high entropy.
The Message itself is text, so dict/huffman fits well.
TODO: store the 4 coloms SoA instead of AoS ?
## Message uniqueness
This protocol relies on the randomness of `message_id` and the clocks to be more or less synchronized.
However, `message_id` can be manipulated freely by any peer, this can make messages appear as duplicates.
This can be used here, if you don't wish your messages to be syncronized (to an extent).
## Security
Only sync publicly sent/recieved messages.
Only allow sync or extended time ranges from peers you trust (enough).
The default shall be to not offer any messages.
Indirect messages shall be low in credibility, while direct synced (by author), with mid credibility.
Either only high or mid credibility shall be sent.
Manual exceptions to all can be made at the users discretion, eg for other self owned devices.
## File transfer requests
TODO: is reusing the ft request api a good idea for this?
| fttype | name | content (ft id) |
|------------|------|---------------------|
| 0x00000f02 | time range msgpack | - ts start </br> - ts end |
## File transfer content
| fttype | name | content | note |
|------------|------|----------------------------|---|
| 0x00000f02 | time range msgpack | `message list` in msgpack | |
### time range msgpack
Msgpack array of messages.
```
name | type/size | note
-------------------------|-------------------|-----
- array | 32bit number msgs
- ts | 64bit deciseconds
- ppk | 32bytes
- mid | 16bit
- msgtype | enum (string or number?)
- if text/action |
- text | string | maybe byte array instead?
- if file |
- fkind | 32bit enum | is this right?
- fid | bytes kind | length depends on kind
```
Name is the actual string key.
Data type sizes are suggestions, if not defined by the tox protocol.
How unknown `msgtype`s are handled is client defined.
They can be fully ignored or displayed as broken.
## TODO
- [ ] figure out a pro-active approach (instead of waiting for a range request)
- [ ] compression in the ft layer? (would make it reusable) hint/autodetect/autoenable for >1k ?

View File

@ -0,0 +1,82 @@
#include "./ts_find_start.hpp"
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/message3/components.hpp>
#include <cassert>
int main(void) {
Message3Registry msg_reg;
{
std::cout << "TEST empty reg\n";
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
const auto res = find_start_by_ts(ts_view, 42);
assert(res == ts_view.end());
}
{
std::cout << "TEST single msg newer (fail)\n";
Message3Handle msg{msg_reg, msg_reg.create()};
msg.emplace<Message::Components::Timestamp>(43ul);
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
const auto res = find_start_by_ts(ts_view, 42);
assert(res == ts_view.end());
msg.destroy();
}
{
std::cout << "TEST single msg same (succ)\n";
Message3Handle msg{msg_reg, msg_reg.create()};
msg.emplace<Message::Components::Timestamp>(42ul);
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
const auto res = find_start_by_ts(ts_view, 42);
assert(res != ts_view.end());
msg.destroy();
}
{
std::cout << "TEST single msg older (succ)\n";
Message3Handle msg{msg_reg, msg_reg.create()};
msg.emplace<Message::Components::Timestamp>(41ul);
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
const auto res = find_start_by_ts(ts_view, 42);
assert(res != ts_view.end());
msg.destroy();
}
{
std::cout << "TEST multi msg\n";
Message3Handle msg{msg_reg, msg_reg.create()};
msg.emplace<Message::Components::Timestamp>(41ul);
Message3Handle msg2{msg_reg, msg_reg.create()};
msg2.emplace<Message::Components::Timestamp>(42ul);
Message3Handle msg3{msg_reg, msg_reg.create()};
msg3.emplace<Message::Components::Timestamp>(43ul);
// see message3/message_time_sort.cpp
msg_reg.sort<Message::Components::Timestamp>([](const auto& lhs, const auto& rhs) -> bool {
return lhs.ts > rhs.ts;
}, entt::insertion_sort{});
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
auto res = find_start_by_ts(ts_view, 42);
assert(res != ts_view.end());
assert(*res == msg2);
res++;
assert(*res == msg);
msg3.destroy();
msg2.destroy();
msg.destroy();
}
return 0;
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <algorithm>
#include <cstdint>
#include <iostream>
// perform binary search to find the first message not newer than ts_start
template<typename View>
auto find_start_by_ts(const View& view, uint64_t ts_start) {
std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << "\n";
// -> first value smaller than start ts
auto res = std::lower_bound(
view.begin(), view.end(),
ts_start,
[&view](const auto& a, const auto& b) {
const auto& [a_comp] = view.get(a);
return a_comp.ts > b; // > bc ts is sorted high to low?
}
);
if (res != view.end()) {
const auto& [ts_comp] = view.get(*res);
std::cout << "!!!! first value not newer than start ts is " << ts_comp.ts << "\n";
} else {
std::cout << "!!!! no first value not newer than start ts\n";
}
return res;
}