Compare commits

..

29 Commits

Author SHA1 Message Date
Green Sky
7cd68845ca
forgot to reset timer on recv data 2024-12-19 17:17:10 +01:00
Green Sky
f40907d42a
use actual activity for receiving transfers for iterate interval 2024-12-19 13:33:30 +01:00
Green Sky
6d7d643207
adjust flow to iterate speed
higher ft1sha1 iterate interval (not fixing it yet)
2024-12-16 12:52:33 +01:00
Green Sky
b35babe3f8
switch to os provided remote have set and other fixes 2024-12-15 01:23:08 +01:00
Green Sky
78390dd342
update object, object update lock and rare crash 2024-12-13 01:30:03 +01:00
Green Sky
eb169b2779
add participation and log spam fix 2024-12-11 23:23:05 +01:00
Green Sky
100279a483
fixes 2024-12-10 17:57:53 +01:00
Green Sky
60b3d5d941
big ft fixes, mostly for info, but also other stuff 2024-12-10 17:18:28 +01:00
Green Sky
6ad2905e07
hs2: change msgpack format and fixes 2024-12-09 23:38:07 +01:00
Green Sky
930c829031
rizzler working, more fixes everywhere
there still are some crashes that needs workarounds
2024-12-09 22:58:36 +01:00
Green Sky
a139f412b1
various fixes resulting in the first time running the code 2024-12-08 16:08:30 +01:00
Green Sky
abdf6672bf
hs2 plug, rizzler still non functional 2024-12-08 14:48:24 +01:00
Green Sky
7bbaa9b929
move in plugin 2024-12-08 13:17:22 +01:00
Green Sky
70620a901b
starting the work on hs2 rizzler 2024-12-07 23:34:19 +01:00
Green Sky
231928339e
sigma 2024-12-07 11:38:31 +01:00
Green Sky
294c5346ca
hs2 send done, but untested 2024-12-06 22:41:05 +01:00
Green Sky
adeaca4efe
early out have when we have all 2024-12-06 16:36:02 +01:00
Green Sky
ba809eda43
refactor and test ts start search 2024-12-06 14:11:06 +01:00
Green Sky
b9a7c75d20
hs: progess on the log file generator, fully written but untested
ft: timeout changes and wording fixes
2024-12-05 23:06:44 +01:00
Green Sky
04b6f7925a
use cache for group peer to contact lookup (~5% cpu) 2024-12-02 14:27:39 +01:00
Green Sky
5601ad91f5
a bunch of allocation optimizations 2024-12-02 13:08:47 +01:00
Green Sky
741f1428d3
updated spec to 2.1, changed almost everything
other small fixes
2024-11-28 12:59:14 +01:00
Green Sky
34dc01d4dc
fix chunk picker round robin actually working
(worked kinda bc of bug before)
make priority dynamic and fix skipping to make it work too
2024-11-22 13:51:48 +01:00
Green Sky
1bf1fbce75
small hs progress 2024-11-22 13:51:25 +01:00
37b92f67c8
respect voice state for receiving file messages
we should also check on send in the future
2024-11-06 10:50:42 +01:00
01c892df8c
tweak finishing timer 2024-11-06 10:49:24 +01:00
6eb5826616
split recv and send, they dont share any code (probably) 2024-11-03 18:21:02 +01:00
2e6b15e4ad
more hs drafting 2024-11-01 11:31:05 +01:00
63de78aaeb
add spec draft to repo 2024-10-31 15:46:48 +01:00
31 changed files with 2011 additions and 439 deletions

View File

@ -1,9 +1,21 @@
cmake_minimum_required(VERSION 3.9 FATAL_ERROR)
cmake_minimum_required(VERSION 3.24 FATAL_ERROR)
add_subdirectory(./external)
project(solanaceae)
if (CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR)
set(SOLANACEAE_NGCFT1_STANDALONE ON)
else()
set(SOLANACEAE_NGCFT1_STANDALONE OFF)
endif()
message("II SOLANACEAE_NGCFT1_STANDALONE " ${SOLANACEAE_NGCFT1_STANDALONE})
option(SOLANACEAE_NGCFT1_BUILD_PLUGINS "Build the solanaceae_ngcft1 plugins" ${SOLANACEAE_NGCFT1_BUILD_PLUGINS})
# TODO: move this stuff to src
########################################
add_library(solanaceae_ngcext
./solanaceae/ngc_ext/ngcext.hpp
./solanaceae/ngc_ext/ngcext.cpp
@ -43,21 +55,6 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC
########################################
add_library(solanaceae_ngchs2
./solanaceae/ngc_hs2/ngc_hs2.hpp
./solanaceae/ngc_hs2/ngc_hs2.cpp
)
target_include_directories(solanaceae_ngchs2 PUBLIC .)
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
target_link_libraries(solanaceae_ngchs2 PUBLIC
solanaceae_ngcft1
solanaceae_tox_contacts
solanaceae_message3
solanaceae_object_store
)
########################################
add_library(solanaceae_sha1_ngcft1
# hacky deps
./solanaceae/ngc_ft1_sha1/mio.hpp
@ -117,8 +114,6 @@ target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
solanaceae_file2
)
########################################
option(SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING "Build the solanaceae_ngcft1_sha1 tests" OFF)
message("II SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING " ${SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING})
@ -136,3 +131,51 @@ if (SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING)
endif()
########################################
add_library(solanaceae_ngchs2
./solanaceae/ngc_hs2/serl.hpp
./solanaceae/ngc_hs2/ts_find_start.hpp
./solanaceae/ngc_hs2/ngc_hs2_sigma.hpp
./solanaceae/ngc_hs2/ngc_hs2_sigma.cpp
./solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp
./solanaceae/ngc_hs2/ngc_hs2_rizzler.cpp
)
target_include_directories(solanaceae_ngchs2 PUBLIC .)
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
target_link_libraries(solanaceae_ngchs2 PUBLIC
solanaceae_ngcft1
solanaceae_sha1_ngcft1 # HACK: properly abstract filekind/-id
solanaceae_tox_contacts
solanaceae_message3
solanaceae_object_store
nlohmann_json::nlohmann_json
)
option(SOLANACEAE_NGCHS2_BUILD_TESTING "Build the solanaceae_ngchs2 tests" OFF)
message("II SOLANACEAE_NGCHS2_BUILD_TESTING " ${SOLANACEAE_NGCHS2_BUILD_TESTING})
if (SOLANACEAE_NGCHS2_BUILD_TESTING)
include(CTest)
add_executable(test_hs2_ts_binarysearch
./solanaceae/ngc_hs2/test_ts_binarysearch.cpp
)
target_link_libraries(test_hs2_ts_binarysearch PUBLIC
solanaceae_ngchs2
)
add_test(NAME test_hs2_ts_binarysearch COMMAND test_hs2_ts_binarysearch)
endif()
########################################
if (SOLANACEAE_NGCFT1_BUILD_PLUGINS)
add_subdirectory(./plugins)
endif()

View File

@ -2,3 +2,13 @@ cmake_minimum_required(VERSION 3.9 FATAL_ERROR)
add_subdirectory(./sha1)
# we are running a custom msgpack serialization for hs2
if (NOT TARGET nlohmann_json::nlohmann_json)
FetchContent_Declare(json
URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz
URL_HASH SHA256=d6c65aca6b1ed68e7a182f4757257b107ae403032760ed6ef121c9d55e81757d
EXCLUDE_FROM_ALL
)
FetchContent_MakeAvailable(json)
endif()

33
plugins/CMakeLists.txt Normal file
View File

@ -0,0 +1,33 @@
cmake_minimum_required(VERSION 3.9...3.24 FATAL_ERROR)
########################################
add_library(plugin_ngcft1 MODULE
./plugin_ngcft1.cpp
)
target_compile_features(plugin_ngcft1 PUBLIC cxx_std_17)
set_target_properties(plugin_ngcft1 PROPERTIES
C_VISIBILITY_PRESET hidden
)
target_compile_definitions(plugin_ngcft1 PUBLIC ENTT_API_IMPORT)
target_link_libraries(plugin_ngcft1 PUBLIC
solanaceae_plugin
solanaceae_ngcext
solanaceae_ngcft1
solanaceae_sha1_ngcft1
)
########################################
add_library(plugin_ngchs2 MODULE
./plugin_ngchs2.cpp
)
target_compile_features(plugin_ngchs2 PUBLIC cxx_std_17)
set_target_properties(plugin_ngchs2 PROPERTIES
C_VISIBILITY_PRESET hidden
)
target_compile_definitions(plugin_ngchs2 PUBLIC ENTT_API_IMPORT)
target_link_libraries(plugin_ngchs2 PUBLIC
solanaceae_plugin
solanaceae_ngchs2
)

81
plugins/plugin_ngcft1.cpp Normal file
View File

@ -0,0 +1,81 @@
#include <solanaceae/plugin/solana_plugin_v1.h>
#include <solanaceae/ngc_ext/ngcext.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp>
#include <entt/entt.hpp>
#include <entt/fwd.hpp>
#include <memory>
#include <iostream>
static std::unique_ptr<NGCEXTEventProvider> g_ngcextep = nullptr;
// TODO: make sep plug
static std::unique_ptr<NGCFT1> g_ngcft1 = nullptr;
static std::unique_ptr<SHA1_NGCFT1> g_sha1_ngcft1 = nullptr;
constexpr const char* plugin_name = "NGCEXT";
extern "C" {
SOLANA_PLUGIN_EXPORT const char* solana_plugin_get_name(void) {
return plugin_name;
}
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_get_version(void) {
return SOLANA_PLUGIN_VERSION;
}
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api) {
std::cout << "PLUGIN " << plugin_name << " START()\n";
if (solana_api == nullptr) {
return 1;
}
try {
auto* os = PLUG_RESOLVE_INSTANCE(ObjectStore2);
auto* tox_i = PLUG_RESOLVE_INSTANCE(ToxI);
auto* tox_event_provider_i = PLUG_RESOLVE_INSTANCE(ToxEventProviderI);
auto* cr = PLUG_RESOLVE_INSTANCE_VERSIONED(Contact3Registry, "1");
auto* rmm = PLUG_RESOLVE_INSTANCE(RegistryMessageModelI);
auto* tcm = PLUG_RESOLVE_INSTANCE(ToxContactModel2);
// static store, could be anywhere tho
// construct with fetched dependencies
g_ngcextep = std::make_unique<NGCEXTEventProvider>(*tox_i, *tox_event_provider_i);
g_ngcft1 = std::make_unique<NGCFT1>(*tox_i, *tox_event_provider_i, *g_ngcextep.get());
g_sha1_ngcft1 = std::make_unique<SHA1_NGCFT1>(*os, *cr, *rmm, *g_ngcft1.get(), *tcm, *tox_event_provider_i, *g_ngcextep.get());
// register types
PLUG_PROVIDE_INSTANCE(NGCEXTEventProviderI, plugin_name, g_ngcextep.get());
PLUG_PROVIDE_INSTANCE(NGCFT1EventProviderI, plugin_name, g_ngcft1.get());
PLUG_PROVIDE_INSTANCE(NGCFT1, plugin_name, g_ngcft1.get());
PLUG_PROVIDE_INSTANCE(SHA1_NGCFT1, plugin_name, g_sha1_ngcft1.get());
} catch (const ResolveException& e) {
std::cerr << "PLUGIN " << plugin_name << " " << e.what << "\n";
return 2;
}
return 0;
}
SOLANA_PLUGIN_EXPORT void solana_plugin_stop(void) {
std::cout << "PLUGIN " << plugin_name << " STOP()\n";
g_sha1_ngcft1.reset();
g_ngcft1.reset();
g_ngcextep.reset();
}
SOLANA_PLUGIN_EXPORT float solana_plugin_tick(float delta) {
const float ft_interval = g_ngcft1->iterate(delta);
const float sha_interval = g_sha1_ngcft1->iterate(delta);
return std::min<float>(ft_interval, sha_interval);
}
} // extern C

79
plugins/plugin_ngchs2.cpp Normal file
View File

@ -0,0 +1,79 @@
#include <solanaceae/plugin/solana_plugin_v1.h>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp> // this hurts
#include <solanaceae/ngc_hs2/ngc_hs2_sigma.hpp>
#include <solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp>
#include <entt/entt.hpp>
#include <entt/fwd.hpp>
#include <memory>
#include <iostream>
// https://youtu.be/OwT83dN82pc
static std::unique_ptr<NGCHS2Sigma> g_ngchs2s = nullptr;
static std::unique_ptr<NGCHS2Rizzler> g_ngchs2r = nullptr;
constexpr const char* plugin_name = "NGCHS2";
extern "C" {
SOLANA_PLUGIN_EXPORT const char* solana_plugin_get_name(void) {
return plugin_name;
}
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_get_version(void) {
return SOLANA_PLUGIN_VERSION;
}
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api) {
std::cout << "PLUGIN " << plugin_name << " START()\n";
if (solana_api == nullptr) {
return 1;
}
try {
//auto* tox_i = PLUG_RESOLVE_INSTANCE(ToxI);
auto* tox_event_provider_i = PLUG_RESOLVE_INSTANCE(ToxEventProviderI);
auto* cr = PLUG_RESOLVE_INSTANCE_VERSIONED(Contact3Registry, "1");
auto* rmm = PLUG_RESOLVE_INSTANCE(RegistryMessageModelI);
auto* tcm = PLUG_RESOLVE_INSTANCE(ToxContactModel2);
auto* ngcft1 = PLUG_RESOLVE_INSTANCE(NGCFT1);
auto* sha1_ngcft1 = PLUG_RESOLVE_INSTANCE(SHA1_NGCFT1);
// static store, could be anywhere tho
// construct with fetched dependencies
g_ngchs2s = std::make_unique<NGCHS2Sigma>(*cr, *rmm, *tcm, *ngcft1);
g_ngchs2r = std::make_unique<NGCHS2Rizzler>(*cr, *rmm, *tcm, *ngcft1, *tox_event_provider_i, *sha1_ngcft1);
// register types
PLUG_PROVIDE_INSTANCE(NGCHS2Sigma, plugin_name, g_ngchs2s.get());
PLUG_PROVIDE_INSTANCE(NGCHS2Rizzler, plugin_name, g_ngchs2r.get());
} catch (const ResolveException& e) {
std::cerr << "PLUGIN " << plugin_name << " " << e.what << "\n";
return 2;
}
return 0;
}
SOLANA_PLUGIN_EXPORT void solana_plugin_stop(void) {
std::cout << "PLUGIN " << plugin_name << " STOP()\n";
g_ngchs2r.reset();
g_ngchs2s.reset();
}
SOLANA_PLUGIN_EXPORT float solana_plugin_tick(float delta) {
const float sigma_interval = g_ngchs2s->iterate(delta);
const float rizzler_interval = g_ngchs2r->iterate(delta);
return std::min<float>(sigma_interval, rizzler_interval);
}
} // extern C

View File

@ -255,6 +255,7 @@ bool NGCEXTEventProvider::parse_ft1_data_ack(
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
e.transfer_id = data[curser++];
e.sequence_ids.reserve(std::max<int64_t>(data_size-curser, 1)/sizeof(uint16_t));
while (curser < data_size) {
_DATA_HAVE(sizeof(uint16_t), std::cerr << "NGCEXT: packet too small, missing seq_id\n"; return false)
uint16_t seq_id = data[curser++];

View File

@ -61,10 +61,6 @@ struct CCAI {
// returns -1 if not implemented, can return 0
virtual int64_t inFlightBytes(void) const { return -1; }
// returns -1 if not implemented, can return 0
// excluded timed out packets (not those currently resent)
virtual int64_t inFlightBytesAccounted(void) const { return -1; }
public: // callbacks
// data size is without overhead
virtual void onSent(SeqIDType seq, size_t data_size) = 0;

View File

@ -93,8 +93,7 @@ int64_t CUBIC::canSend(float time_delta) {
}
const auto window = getCWnD();
//int64_t cspace_bytes = window - _in_flight_bytes;
int64_t cspace_bytes = window - _in_flight_bytes_accounted;
int64_t cspace_bytes = window - _in_flight_bytes;
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
//std::cerr << "CUBIC: cspace < seg size\n";
return 0u;

View File

@ -7,7 +7,9 @@
float FlowOnly::getCurrentDelay(void) const {
// below 1ms is useless
return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
//return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
// the current iterate rate min is 5ms
return std::clamp(_rtt_ema, 0.005f, RTT_MAX);
}
void FlowOnly::addRTT(float new_delay) {
@ -29,25 +31,6 @@ void FlowOnly::updateWindow(void) {
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
}
void FlowOnly::updateAccounted(void) {
int64_t size_timedout {0};
{ // can be expensive
// code see getTimeouts()
// after 3 rtt delay, we trigger timeout
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
for (const auto& [seq, time_stamp, size, _] : _in_flight) {
if (now_adjusted > time_stamp) {
//list.push_back(seq);
size_timedout += size;
}
}
}
_in_flight_bytes_accounted = _in_flight_bytes - size_timedout;
}
void FlowOnly::updateCongestion(void) {
updateWindow();
const auto tmp_window = getWindow();
@ -89,10 +72,8 @@ int64_t FlowOnly::canSend(float time_delta) {
}
updateWindow();
updateAccounted();
//int64_t fspace = _fwnd - _in_flight_bytes;
int64_t fspace = _fwnd - _in_flight_bytes_accounted;
int64_t fspace = _fwnd - _in_flight_bytes;
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
return 0u;
}
@ -107,6 +88,7 @@ int64_t FlowOnly::canSend(float time_delta) {
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
std::vector<SeqIDType> list;
list.reserve(_in_flight.size()/3); // we dont know, so we just guess
// after 3 rtt delay, we trigger timeout
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
@ -128,10 +110,6 @@ int64_t FlowOnly::inFlightBytes(void) const {
return _in_flight_bytes;
}
int64_t FlowOnly::inFlightBytesAccounted(void) const {
return _in_flight_bytes_accounted;
}
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
if constexpr (true) {
size_t sum {0u};

View File

@ -12,7 +12,7 @@ struct FlowOnly : public CCAI {
public: // config
static constexpr float RTT_EMA_ALPHA = 0.001f; // might need change over time
static constexpr float RTT_UP_MAX = 3.0f; // how much larger a delay can be to be taken into account
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
static constexpr float RTT_MAX = 2.f; // maybe larger for tunneled connections
protected:
// initialize to low value, will get corrected very fast
@ -32,7 +32,6 @@ struct FlowOnly : public CCAI {
};
std::vector<FlyingBunch> _in_flight;
int64_t _in_flight_bytes {0};
int64_t _in_flight_bytes_accounted {0};
int32_t _consecutive_events {0};
@ -59,8 +58,6 @@ struct FlowOnly : public CCAI {
void updateWindow(void);
void updateAccounted(void);
virtual void onCongestion(void) {};
// internal logic, calls the onCongestion() event
@ -80,7 +77,6 @@ struct FlowOnly : public CCAI {
int64_t inFlightCount(void) const override;
int64_t inFlightBytes(void) const override;
int64_t inFlightBytesAccounted(void) const override;
public: // callbacks
// data size is without overhead

View File

@ -56,7 +56,9 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
} else {
#if 0 // too spammy
std::cerr << "NGCFT1 warning: no space to resend timedout\n";
#endif
}
}
});
@ -169,7 +171,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
}
}
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
if (peer.cca) {
auto timeouts = peer.cca->getTimeouts();
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
@ -200,6 +202,7 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
}
}
bool recv_activity {false};
for (size_t idx = 0; idx < peer.recv_transfers.size(); idx++) {
if (!peer.recv_transfers.at(idx).has_value()) {
continue;
@ -209,20 +212,21 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
// proper switch case?
if (transfer.state == Group::Peer::RecvTransfer::State::FINISHING) {
transfer.finishing_timer -= time_delta;
if (transfer.finishing_timer <= 0.f) {
//dispatch(
// NGCFT1_Event::recv_done,
// Events::NGCFT1_recv_done{
// group_number, peer_number,
// uint8_t(idx)
// }
//);
transfer.timer -= time_delta;
if (transfer.timer <= 0.f) {
peer.recv_transfers.at(idx).reset();
}
recv_activity = true; // count as activity, not sure we need this
} else {
transfer.timer += time_delta;
if (transfer.timer < 0.5f) {
// back off when no activity
recv_activity = true;
}
}
}
return peer.active_send_transfers > 0 || recv_activity;
}
const CCAI* NGCFT1::getPeerCCA(
@ -268,37 +272,61 @@ NGCFT1::NGCFT1(
float NGCFT1::iterate(float time_delta) {
_time_since_activity += time_delta;
bool transfer_in_progress {false};
bool transfer_activity {false};
for (auto& [group_number, group] : groups) {
for (auto& [peer_number, peer] : group.peers) {
iteratePeer(time_delta, group_number, peer_number, peer);
transfer_activity = transfer_activity || iteratePeer(time_delta, group_number, peer_number, peer);
#if 0
// find any active transfer
if (!transfer_in_progress) {
if (!transfer_activity) {
for (const auto& t : peer.send_transfers) {
if (t.has_value()) {
transfer_in_progress = true;
transfer_activity = true;
#if 0
std::cout
<< "--- active send transfer "
<< group_number << ":" << peer_number
<< "(" << std::get<0>(_t.toxGroupPeerGetName(group_number, peer_number)).value_or("<unk>") << ")"
<< " fk:" << t.value().file_kind
<< " state:" << (int)t.value().state
<< " tsa:" << t.value().time_since_activity
<< "\n"
;
#endif
break;
}
}
}
if (!transfer_in_progress) {
if (!transfer_activity) {
for (const auto& t : peer.recv_transfers) {
if (t.has_value()) {
transfer_in_progress = true;
transfer_activity = true;
#if 0
std::cout
<< "--- active recv transfer "
<< group_number << ":" << peer_number
<< "(" << std::get<0>(_t.toxGroupPeerGetName(group_number, peer_number)).value_or("<unk>") << ")"
<< " fk:" << t.value().file_kind
<< " state:" << (int)t.value().state
<< " ft:" << t.value().finishing_timer
<< "\n"
;
#endif
break;
}
}
}
#endif
}
}
if (transfer_in_progress) {
if (transfer_activity) {
_time_since_activity = 0.f;
// ~15ms for up to 1mb/s
// ~5ms for up to 4mb/s
return 0.005f; // 5ms
} else if (_time_since_activity < 0.5f) {
} else if (_time_since_activity < 1.0f) {
// bc of temporality
return 0.025f;
} else {
@ -306,13 +334,12 @@ float NGCFT1::iterate(float time_delta) {
}
}
void NGCFT1::NGC_FT1_send_request_private(
bool NGCFT1::NGC_FT1_send_request_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, uint32_t file_id_size
) {
// TODO: error check
_neep.send_ft1_request(group_number, peer_number, file_kind, file_id, file_id_size);
return _neep.send_ft1_request(group_number, peer_number, file_kind, file_id, file_id_size);
}
bool NGCFT1::NGC_FT1_send_init_private(
@ -456,44 +483,6 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
//#if !NDEBUG
std::cout << "NGCFT1: got FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n";
//#endif
#if 0
bool accept = false;
dispatch(
NGCFT1_Event::recv_init,
Events::NGCFT1_recv_init{
e.group_number, e.peer_number,
static_cast<NGCFT1_file_kind>(e.file_kind),
e.file_id.data(), static_cast<uint32_t>(e.file_id.size()),
e.transfer_id,
e.file_size,
accept
}
);
if (!accept) {
std::cout << "NGCFT1: rejected init\n";
return true; // return true?
}
_neep.send_ft1_init_ack(e.group_number, e.peer_number, e.transfer_id);
std::cout << "NGCFT1: accepted init\n";
auto& peer = groups[e.group_number].peers[e.peer_number];
if (peer.recv_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << int(e.transfer_id) << ", other peer started new transfer on preexising\n";
}
peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{
e.file_kind,
e.file_id,
Group::Peer::RecvTransfer::State::INITED,
e.file_size,
0u,
{} // rsb
};
return true;
#else
// HACK: simply forward to init2 hanlder
return onEvent(Events::NGCEXT_ft1_init2{
e.group_number,
@ -504,12 +493,11 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
0x00, // non set
e.file_id, // sadly a copy, wont matter in the future
});
#endif
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
//#if !NDEBUG
std::cout << "NGCFT1: got FT1_INIT_ACK mds:" << e.max_lossy_data_size << "\n";
std::cout << "NGCFT1: got FT1_INIT_ACK " << e.group_number << ":" << e.peer_number << " mds:" << e.max_lossy_data_size << "\n";
//#endif
// we now should start sending data
@ -587,6 +575,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
}
auto& transfer = peer.recv_transfers[e.transfer_id].value();
transfer.timer = 0.f;
// do reassembly, ignore dups
transfer.rsb.add(e.sequence_id, std::vector<uint8_t>(e.data)); // TODO: ugly explicit copy for what should just be a move
@ -625,7 +614,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
// TODO: keep around for remote timeout + delay + offset, so we can be sure all acks where received
// or implement a dedicated finished that needs to be acked
transfer.finishing_timer = 0.75f; // TODO: we are receiving, we dont know delay
//transfer.timer = 0.75f; // TODO: we are receiving, we dont know delay
transfer.timer = FlowOnly::RTT_MAX;
dispatch(
NGCFT1_Event::recv_done,
@ -669,6 +659,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
{
std::vector<CCAI::SeqIDType> seqs;
seqs.reserve(e.sequence_ids.size());
for (const auto it : e.sequence_ids) {
// TODO: improve this o.o
seqs.push_back({e.transfer_id, it});

View File

@ -156,16 +156,17 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
std::vector<uint8_t> file_id;
enum class State {
INITED, //init acked, but no data received yet (might be dropped)
RECV, // receiving data
FINISHING, // got all the data, but we wait for 2*delay, since its likely there is data still arriving
INITED, // init acked, but no data received yet (might be dropped)
RECV, // receiving data
FINISHING, // got all the data, but we wait for 2*delay, since its likely there is data still arriving
} state;
uint64_t file_size {0};
uint64_t file_size_current {0};
// if state INITED or RECV, time since last activity
// if state FINISHING and it reaches 0, delete
float finishing_timer {0.f};
float timer {0.f};
// sequence id based reassembly
RecvSequenceBuffer rsb;
@ -209,7 +210,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
protected:
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
bool iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const;
@ -223,7 +224,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
float iterate(float delta);
public: // ft1 api
void NGC_FT1_send_request_private(
bool NGC_FT1_send_request_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, uint32_t file_id_size

View File

@ -16,7 +16,7 @@ enum class NGCFT1_file_kind : uint32_t {
// id: TOX_FILE_ID_LENGTH (32) bytes
// this is basically and id and probably not a hash, like the tox friend api
// this id can be unique between 2 peers
ID = 8u,
ID = 8u, // TODO: this is actually DATA and 0
// id: hash of the info, like a torrent infohash (using the same hash as the data)
// TODO: determain internal format
@ -75,20 +75,7 @@ enum class NGCFT1_file_kind : uint32_t {
// https://gist.github.com/Green-Sky/440cd9817a7114786850eb4c62dc57c3
// id: ts start, ts end
// content:
// - ts start (do we need this? when this is part of the id?)
// - ts end (same)
// - list size
// - ppk
// - mid
// - ts
HS2_INFO_RANGE_TIME = 0x00000f00,
// TODO: half open ranges
// TODO: id based
// TODO: ppk based?
// id: ppk, mid, ts
HS2_SINGLE_MESSAGE,
// TODO: message pack
HS2_RANGE_TIME = 0x00000f00, // TODO: remove, did not survive
HS2_RANGE_TIME_MSGPACK = 0x00000f02,
};

View File

@ -133,6 +133,7 @@ void ChunkPicker::updateParticipation(
entt::dense_set<Object> checked;
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
const ObjectHandle o {objreg, ov};
if (participating_unfinished.contains(o)) {
@ -150,6 +151,21 @@ void ChunkPicker::updateParticipation(
participating_unfinished.erase(o);
continue;
}
// TODO: optimize this to only change on dirty, or something
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
Priority prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
uint16_t pskips =
prio == Priority::HIGHEST ? 0u :
prio == Priority::HIGH ? 1u :
prio == Priority::NORMAL ? 2u :
prio == Priority::LOW ? 4u :
8u // LOWEST
;
participating_unfinished.at(o).should_skip = pskips;
}
} else {
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
continue;
@ -160,8 +176,6 @@ void ChunkPicker::updateParticipation(
}
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
//using Priority = Components::DownloadPriority::Priority;
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
Priority prio = Priority::NORMAL;
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
@ -239,13 +253,12 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// round robin content (remember last obj)
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
participating_in_last = participating_unfinished.begin()->first;
//participating_in_last = *participating_unfinished.begin();
}
assert(objreg.valid(participating_in_last));
auto it = participating_unfinished.find(participating_in_last);
// hard limit robin rounds to array size time 20
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++) {
// hard limit robin rounds to array size times 20
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) {
if (it == participating_unfinished.end()) {
it = participating_unfinished.begin();
}
@ -254,11 +267,12 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
it->second.skips++;
continue;
}
it->second.skips = 0;
ObjectHandle o {objreg, it->first};
// intersect self have with other have
if (!o.all_of<Components::RemoteHaveBitset, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
if (!o.all_of<ObjComp::F::RemoteHaveBitset, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
// rare case where no one else has anything
continue;
}
@ -270,7 +284,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
//const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
const auto& others_have = o.get<Components::RemoteHaveBitset>().others;
const auto& others_have = o.get<ObjComp::F::RemoteHaveBitset>().others;
auto other_it = others_have.find(c);
if (other_it == others_have.end()) {
// rare case where the other is participating but has nothing
@ -361,7 +375,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
}
}
if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
if (it == participating_unfinished.end()) {
participating_in_last = entt::null;
} else {
participating_in_last = it->first;

View File

@ -49,7 +49,8 @@ void ReAnnounceTimer::reset(void) {
void ReAnnounceTimer::lower(void) {
timer *= 0.1f;
last_max *= 0.1f;
//last_max *= 0.1f; // is this a good idea?
last_max *= 0.9f; // is this a good idea?
}
void TransferStatsTally::Peer::trimSent(const float time_now) {

View File

@ -70,14 +70,6 @@ namespace Components {
entt::dense_set<Contact3> participants;
};
struct RemoteHaveBitset {
struct Entry {
bool have_all {false};
BitSet have;
};
entt::dense_map<Contact3, Entry> others;
};
struct ReRequestInfoTimer {
float timer {0.f};
};

View File

@ -63,6 +63,7 @@ size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
std::vector<uint8_t> FT1InfoSHA1::toBuffer(void) const {
std::vector<uint8_t> buffer;
buffer.reserve(256+8+4+20*chunks.size());
assert(!file_name.empty());
// TODO: optimize

View File

@ -18,6 +18,12 @@ void re_announce(
std::vector<Object> to_remove;
os_reg.view<Components::ReAnnounceTimer>().each([&os_reg, &cr, &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) {
ObjectHandle o{os_reg, ov};
// if no known targets, or no hash, remove
if (!o.all_of<Components::AnnounceTargets, Components::FT1InfoSHA1Hash>()) {
to_remove.push_back(ov);
return;
}
// TODO: pause
//// if paused -> remove
//if (o.all_of<Message::Components::Transfer::TagPaused>()) {
@ -25,10 +31,11 @@ void re_announce(
// return;
//}
// if not downloading or info incomplete -> remove
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
// // if not downloading or info incomplete -> remove
//if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
// if not downloading AND info complete -> remove
if (!o.all_of<Components::FT1ChunkSHA1Cache>() && o.all_of<Components::FT1InfoSHA1Data>()) {
to_remove.push_back(ov);
assert(false && "transfer in broken state");
return;
}

View File

@ -7,8 +7,8 @@ void ReceivingTransfers::tick(float delta) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 20sec, timeout
if (it->second.time_since_activity >= 20.f) {
// if we have not heard for 60sec, timeout
if (it->second.time_since_activity >= 60.f) {
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
//it->second.v

View File

@ -69,6 +69,12 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
assert(o.all_of<Components::Messages>());
for (auto msg : o.get<Components::Messages>().messages) {
// FIXME: hs might create and destory messages for objects without events
// we should really do garbage collection
if (!msg) {
continue;
}
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
// messages no long hold this info
@ -82,33 +88,20 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) {
// get a list of peers we can request this file from
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
// 1 in 20 chance to ask random peer instead
// also works well for empty SuspectedParticipants
if ((_rng()%20) == 0) {
tox_peers.clear();
// or messages? should be the same
if (!ce.all_of<Components::AnnounceTargets>()) {
// rip
std::cerr << "SHA1_NGCFT1 warning: tried random, but no AnnounceTargets\n";
return std::nullopt;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
// 1 in 40 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
// TODO: remove once pc1_announce is shipped
if (tox_peers.empty() || (_rng()%40) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
const uint32_t group_number = tox_peers.back().first;
auto gch = _tcm.getContactGroup(group_number);
assert(static_cast<bool>(gch));
std::vector<uint32_t> un_tox_peers;
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
for (const auto& target : ce.get<Components::AnnounceTargets>().targets) {
for (const auto child : _cr.get<Contact::Components::ParentOf>(target).subs) {
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
@ -119,26 +112,39 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(O
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
un_tox_peers.push_back(tgpe.peer_number);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
if (un_tox_peers.empty()) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng()%un_tox_peers.size();
const auto peer_number = un_tox_peers.at(sample_i);
}
std::cout << "SHA1_NGCFT1: doing random peer select over " << tox_peers.size() << " peers\n";
} else if (ce.all_of<Components::SuspectedParticipants>()) {
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
return std::make_pair(group_number, peer_number);
if (_cr.all_of<Contact::Components::TagSelfStrong>(c)) {
// FIXME: how did we select ourselfs to be a suspected participant
continue;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
} else {
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
}
return std::nullopt;
if (tox_peers.empty()) {
return std::nullopt;
}
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
}
void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) {
@ -247,10 +253,9 @@ SHA1_NGCFT1::SHA1_NGCFT1(
}
float SHA1_NGCFT1::iterate(float delta) {
//std::cerr << "---------- new tick ----------\n";
_mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
entt::dense_map<Contact3, size_t> peer_open_requests;
_peer_open_requests.clear();
{ // timers
// sending transfers
@ -293,13 +298,12 @@ float SHA1_NGCFT1::iterate(float delta) {
assert(!o.any_of<ObjComp::F::TagLocalHaveAll>());
_queue_content_want_info.push_back(o);
//_os.registry().remove<Components::ReRequestInfoTimer>(e);
o.remove<Components::ReRequestInfoTimer>();
// TODO: throw update?
}
}
{ // requested chunk timers
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta, &peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([this, delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
it->second.timer += delta;
@ -307,7 +311,7 @@ float SHA1_NGCFT1::iterate(float delta) {
if (it->second.timer >= 60.f) {
it = ftchunk_requested.chunks.erase(it);
} else {
peer_open_requests[it->second.c] += 1;
_peer_open_requests[it->second.c] += 1;
it++;
}
}
@ -420,6 +424,8 @@ float SHA1_NGCFT1::iterate(float delta) {
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
//std::cout << "SHA1_NGCFT1: trying to request info\n";
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
@ -434,10 +440,12 @@ float SHA1_NGCFT1::iterate(float delta) {
);
ce.emplace<Components::ReRequestInfoTimer>(0.f);
_queue_content_want_info.pop_front();
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
} else {
_queue_content_want_info.push_back(ce);
}
_queue_content_want_info.pop_front();
}
}
@ -447,7 +455,7 @@ float SHA1_NGCFT1::iterate(float delta) {
Systems::chunk_picker_updates(
_cr,
_os.registry(),
peer_open_requests,
_peer_open_requests,
_receiving_transfers,
_nft,
delta
@ -456,12 +464,12 @@ float SHA1_NGCFT1::iterate(float delta) {
// transfer statistics systems
Systems::transfer_tally_update(_os.registry(), getTimeNow());
if (peer_open_requests.empty()) {
if (_peer_open_requests.empty()) {
return 2.f;
} else {
// pretty conservative and should be ajusted on a per peer, per delay basis
// seems to do the trick
return 0.05f;
// ft1 will go lower for us, if we have unresolved info,
// we dont want to be stuck in a high tickrate
return 0.5f;
}
}
@ -552,7 +560,79 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
updateMessages(o); // nop // TODO: remove
}
ObjectHandle SHA1_NGCFT1::constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_kind file_kind, ByteSpan file_id) {
if (file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) {
return {};
}
// check if content exists
const std::vector<uint8_t> sha1_info_hash{file_id.cbegin(), file_id.cend()};
ObjectHandle o;
if (_info_to_content.count(sha1_info_hash)) {
o = _info_to_content.at(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has existing content\n";
} else {
// TODO: backend
o = _mfb.newObject(ByteSpan{sha1_info_hash});
_info_to_content[sha1_info_hash] = o;
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has new content\n";
}
o.get_or_emplace<Components::Messages>().messages.push_back(msg);
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
// TODO: use to_c instead?
if (const auto* from_c_comp = msg.try_get<Message::Components::ContactFrom>(); from_c_comp != nullptr && _cr.valid(from_c_comp->c)) {
Contact3Handle c{_cr, from_c_comp->c};
// TODO: check if public
// since public
if (c.all_of<Contact::Components::Parent>()) {
// TODO: if this is a dummy contact, should it have parent?
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
} else {
std::cerr << "SHA1_NGCFT1 warning: from contact has no parent, cant add to AnnounceTargets\n";
}
// TODO: if private, add c directly
}
// queue announce that we are participating
o.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
// TODO: queue info dl
// TODO: queue info/check if we already have info
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
bool in_info_want {false};
for (const auto it : _queue_content_want_info) {
if (it == o) {
in_info_want = true;
break;
}
}
if (!in_info_want) {
// TODO: check if already receiving
_queue_content_want_info.push_back(o);
}
} else if (o.all_of<Components::FT1InfoSHA1>()){
// remove from info want
o.remove<Components::ReRequestInfoTimer>();
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
if (it != _queue_content_want_info.cend()) {
_queue_content_want_info.erase(it);
}
}
_os.throwEventUpdate(o);
return o;
}
bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
if (_object_update_lock) {
return false;
}
if (!e.e.all_of<ObjComp::Ephemeral::File::ActionTransferAccept>()) {
return false;
}
@ -561,10 +641,17 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
// not ready to load yet, skip
return false;
}
if (e.e.all_of<Components::FT1ChunkSHA1Cache>()) {
// now we update on recv_done, which included info
std::cerr << "SHA1_NGCFT1 warning: accepted but already has FT1ChunkSHA1Cache\n";
return false;
}
_object_update_lock = true;
assert(!e.e.all_of<ObjComp::F::TagLocalHaveAll>());
assert(!e.e.all_of<Components::FT1ChunkSHA1Cache>());
assert(!e.e.all_of<Components::FT1File2>());
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
// first, open file for write(+readback)
std::string full_file_path{e.e.get<ObjComp::Ephemeral::File::ActionTransferAccept>().save_to_path};
@ -589,6 +676,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
// we failed opening that filepath, so we should offer the user the oportunity to save it differently
e.e.remove<ObjComp::Ephemeral::File::ActionTransferAccept>(); // stop
_object_update_lock = false;
return false;
}
@ -667,6 +755,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
updateMessages(e.e);
_object_update_lock = false;
return false; // ?
}
@ -717,7 +806,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
}
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
if (e.file_id_size != 20) {
// error
@ -735,7 +824,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
{ // they advertise interest in the content
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
@ -796,7 +885,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
e.accept = true;
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size};
@ -809,7 +898,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
{ // they have the content (probably, might be fake, should move this to done)
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
@ -868,7 +957,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
std::cerr << "SHA1_NGCFT1 waring: unknown transfer " << e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
// not ours
//std::cerr << "SHA1_NGCFT1 warning: unknown transfer " << (int)e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
return false;
}
@ -920,7 +1010,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
// not ours
//std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
return false;
}
@ -955,7 +1046,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
// TODO: add event to propergate to messages
//_rmm.throwEventUpdate(transfer); // should we?
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
Contact3Handle c;
const auto tpcc_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
if (tpcc_it != _tox_peer_to_contact.cend()) {
c = tpcc_it->second;
} else {
c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (static_cast<bool>(c)) {
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c;
}
}
if (static_cast<bool>(c)) {
chunk_transfer.content.get_or_emplace<Components::TransferStatsTally>()
.tally[c]
@ -1024,11 +1124,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
o.emplace_or_replace<ObjComp::Ephemeral::File::TagTransferPaused>();
_os.throwEventUpdate(o);
updateMessages(o);
} else if (transfer.isChunk()) {
auto o = transfer.getChunk().content;
const auto& info = o.get<Components::FT1InfoSHA1>();
auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
auto& cc = o.get<Components::FT1ChunkSHA1Cache>(); // is this assumption save?
// HACK: only check first chunk (they *should* all be the same)
const auto chunk_index = transfer.getChunk().chunk_indices.front();
@ -1136,6 +1238,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
}
_os.throwEventUpdate(o);
updateMessages(o); // mostly for received bytes
}
@ -1152,11 +1256,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id);
if (transfer.isChunk()) {
// we could cheat here and assume remote has chunk now
_os.throwEventUpdate(transfer.getChunk().content);
updateMessages(transfer.getChunk().content); // mostly for sent bytes
}
} // ignore info transfer for now
_sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id);
return true;
}
@ -1165,6 +1273,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
return false;
}
// TODO: make perms go though contacts
// TODO: consider temporal component? not here, here is now
if (!_tcm.groupPeerCanSpeak(e.group_number, e.peer_number)) {
// peer has not the permission to speak, discard
return false; // return true?
}
uint64_t ts = Message::getTimeMS();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
@ -1212,60 +1327,12 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
rb.try_emplace(self_c, ts);
}
// check if content exists
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
ObjectHandle o;
if (_info_to_content.count(sha1_info_hash)) {
o = _info_to_content.at(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has existing content\n";
} else {
// TODO: backend
o = _mfb.newObject(ByteSpan{sha1_info_hash});
_info_to_content[sha1_info_hash] = o;
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has new content\n";
auto o = constructFileMessageInPlace({reg, new_msg_e}, e.file_kind, {e.file_id, e.file_id_size});
if (o) {
// the other peer just sent the file message, so it is likely they have the file
addParticipation(c, o);
}
o.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
reg_ptr->emplace<Message::Components::MessageFileObject>(new_msg_e, o);
// HACK: assume the message sender is participating. usually a safe bet.
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
// HACK: assume the message sender has all
o.get_or_emplace<Components::RemoteHaveBitset>().others[c] = {true, {}};
// TODO: queue info dl
// TODO: queue info/check if we already have info
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
bool in_info_want {false};
for (const auto it : _queue_content_want_info) {
if (it == o) {
in_info_want = true;
break;
}
}
if (!in_info_want) {
// TODO: check if already receiving
_queue_content_want_info.push_back(o);
}
} else if (o.all_of<Components::FT1InfoSHA1>()){
// remove from info want
o.remove<Components::ReRequestInfoTimer>();
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
if (it != _queue_content_want_info.cend()) {
_queue_content_want_info.erase(it);
}
}
// since public
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
_os.throwEventUpdate(o);
_rmm.throwEventConstruct(reg, new_msg_e);
@ -1340,8 +1407,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
for (const auto& [_, o] : _info_to_content) {
removeParticipation(c, o);
if (o.all_of<Components::RemoteHaveBitset>()) {
o.get<Components::RemoteHaveBitset>().others.erase(c);
if (o.all_of<ObjComp::F::RemoteHaveBitset>()) {
o.get<ObjComp::F::RemoteHaveBitset>().others.erase(c);
}
}
}
@ -1383,11 +1450,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
return false;
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
// we might not know yet
if (addParticipation(c, o)) {
@ -1395,10 +1460,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
//c.emplace_or_replace<ChunkPickerUpdateTag>();
}
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
if (!o.all_of<Components::FT1InfoSHA1>()) {
// we dont have the info yet
return true;
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) {
// init
remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
remote_have.emplace(c, ObjComp::F::RemoteHaveBitset::Entry{false, num_total_chunks});
// new have? nice
//c.emplace_or_replace<ChunkPickerUpdateTag>();
@ -1424,11 +1496,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
a_valid_change = true;
}
if (a_valid_change) {
// new have? nice
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
// check for completion?
// TODO: optimize
bool test_all {true};
@ -1445,6 +1512,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
remote_have_peer.have = BitSet{};
}
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
return true; // we dont care beyond this point
}
if (a_valid_change) {
// new have? nice
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
return true;
}
@ -1474,6 +1550,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n";
return false;
}
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
// we might not know yet
addParticipation(c, o);
if (!o.all_of<Components::FT1InfoSHA1>()) {
// we dont have the info yet
return true;
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
// +7 for byte rounding
@ -1483,22 +1570,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
return false;
}
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
// we might not know yet
addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) {
// init
remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
remote_have.emplace(c, ObjComp::F::RemoteHaveBitset::Entry{false, num_total_chunks});
}
auto& remote_have_peer = remote_have.at(c);
if (!remote_have_peer.have_all) { // TODO: maybe unset with bitset?
BitSet event_bitset{e.chunk_bitset};
// TODO: range replace instead
remote_have_peer.have.merge(event_bitset, e.start_chunk);
// check for completion?
@ -1548,13 +1629,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) {
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
// we might not know yet
addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
remote_have[c] = Components::RemoteHaveBitset::Entry{true, {}};
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
remote_have[c] = ObjComp::F::RemoteHaveBitset::Entry{true, {}};
// new have? nice
c.emplace_or_replace<ChunkPickerUpdateTag>();
@ -1595,21 +1676,27 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) {
// add to participants
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
auto o = itc_it->second;
if (addParticipation(c, o)) {
// something happend, update chunk picker
// !!! this is probably too much
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
std::cout << "SHA1_NGCFT1: and we where interested!\n";
// we should probably send the bitset back here / add to queue (can be multiple packets)
if (o.all_of<Components::FT1ChunkSHA1Cache>() && o.get<Components::FT1ChunkSHA1Cache>().have_count > 0) {
queueBitsetSendFull(c, o);
} else if (o.all_of<Components::ReAnnounceTimer>()) {
o.get<Components::ReAnnounceTimer>().lower();
}
}
// return true instead?
return false;
}

View File

@ -37,6 +37,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
Backends::SHA1MappedFilesystem _mfb;
bool _object_update_lock {false};
std::minstd_rand _rng {1337*11};
using clock = std::chrono::steady_clock;
@ -71,9 +73,12 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
std::deque<QBitsetEntry> _queue_send_bitset;
// FIXME: workaround missing contact events
// only used to remove participation on peer exit
// only used on peer exit (no, also used to quicken lookups)
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
// reset every iterate; kept here as an allocation optimization
entt::dense_map<Contact3, size_t> _peer_open_requests;
void updateMessages(ObjectHandle ce);
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
@ -104,6 +109,9 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts);
// construct the file part in a partially constructed message
ObjectHandle constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_kind file_kind, ByteSpan file_id);
protected: // rmm events (actions)
bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;

View File

@ -1,91 +0,0 @@
#include "./ngc_hs2.hpp"
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
NGCHS2::NGCHS2(
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCFT1& nft
) :
_tcm(tcm),
_tep_sr(tep.newSubRef(this)),
_nft(nft),
_nftep_sr(_nft.newSubRef(this))
{
_tep_sr
.subscribe(TOX_EVENT_GROUP_PEER_JOIN)
.subscribe(TOX_EVENT_GROUP_PEER_EXIT)
;
_nftep_sr
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_request)
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_data)
.subscribe(NGCFT1_Event::send_data)
.subscribe(NGCFT1_Event::recv_done)
.subscribe(NGCFT1_Event::send_done)
;
}
NGCHS2::~NGCHS2(void) {
}
float NGCHS2::iterate(float delta) {
return 1000.f;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_request& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
) {
return false; // not for us
}
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_init& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
) {
return false; // not for us
}
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_data&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_send_data&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_recv_done&) {
return false;
}
bool NGCHS2::onEvent(const Events::NGCFT1_send_done&) {
return false;
}
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
const auto group_number = tox_event_group_peer_join_get_group_number(e);
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
assert(c);
// add to check list with inital cooldown
return false;
}
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
return false;
}

View File

@ -1,44 +0,0 @@
#pragma once
//#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/toxcore/tox_event_interface.hpp>
//#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
// fwd
class ToxContactModel2;
class NGCHS2 : public ToxEventI, public NGCFT1EventI {
ToxContactModel2& _tcm;
//Contact3Registry& _cr;
//RegistryMessageModelI& _rmm;
ToxEventProviderI::SubscriptionReference _tep_sr;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
public:
NGCHS2(
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCFT1& nf
);
~NGCHS2(void);
float iterate(float delta);
protected:
bool onEvent(const Events::NGCFT1_recv_request&) override;
bool onEvent(const Events::NGCFT1_recv_init&) override;
bool onEvent(const Events::NGCFT1_recv_data&) override;
bool onEvent(const Events::NGCFT1_send_data&) override;
bool onEvent(const Events::NGCFT1_recv_done&) override;
bool onEvent(const Events::NGCFT1_send_done&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
};

View File

@ -0,0 +1,532 @@
#include "./ngc_hs2_rizzler.hpp"
#include <solanaceae/contact/components.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
#include <solanaceae/message3/contact_components.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/message3/components.hpp>
#include <solanaceae/tox_messages/msg_components.hpp>
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
// TODO: move somewhere else?
#include <solanaceae/ngc_ft1_sha1/util.hpp>
#include <solanaceae/util/span.hpp>
#include <entt/entity/entity.hpp>
#include <nlohmann/json.hpp>
#include "./serl.hpp"
#include <cstdint>
#include <deque>
#include <cstring>
#include <iostream>
// TODO: move to own file
namespace Components {
struct RequestedChatLogs {
struct Entry {
uint64_t ts_start;
uint64_t ts_end;
//std::vector<uint8_t> fid; // ?
};
std::deque<Entry> list;
bool contains(uint64_t ts_start, uint64_t ts_end);
void addRequest(uint64_t ts_start, uint64_t ts_end);
};
struct RunningChatLogs {
struct Entry {
uint64_t ts_start;
uint64_t ts_end;
std::vector<uint8_t> data;
float last_activity {0.f};
};
// list of transfers
entt::dense_map<uint8_t, Entry> list;
};
bool RequestedChatLogs::contains(uint64_t ts_start, uint64_t ts_end) {
auto it = std::find_if(list.cbegin(), list.cend(), [ts_start, ts_end](const auto& value) {
return value.ts_start == ts_start && value.ts_end == ts_end;
});
return it != list.cend();
}
void RequestedChatLogs::addRequest(uint64_t ts_start, uint64_t ts_end) {
if (contains(ts_start, ts_end)) {
return; // pre existing
}
list.push_back(Entry{ts_start, ts_end});
}
} // Components
// TODO: move to contact reg?
static Contact3 findContactByID(Contact3Registry& cr, const std::vector<uint8_t>& id) {
// TODO: id lookup table, this is very inefficent
for (const auto& [c_it, id_it] : cr.view<Contact::Components::ID>().each()) {
if (id == id_it.data) {
return c_it;
}
}
return entt::null;
}
NGCHS2Rizzler::NGCHS2Rizzler(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft,
ToxEventProviderI& tep,
SHA1_NGCFT1& sha1_nft
) :
_cr(cr),
_rmm(rmm),
_tcm(tcm),
_nft(nft),
_nftep_sr(_nft.newSubRef(this)),
_tep_sr(tep.newSubRef(this)),
_sha1_nft(sha1_nft)
{
_nftep_sr
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_data)
.subscribe(NGCFT1_Event::recv_done)
;
_tep_sr
.subscribe(Tox_Event_Type::TOX_EVENT_GROUP_PEER_JOIN)
;
}
NGCHS2Rizzler::~NGCHS2Rizzler(void) {
}
float NGCHS2Rizzler::iterate(float delta) {
for (auto it = _request_queue.begin(); it != _request_queue.end();) {
it->second.timer += delta;
if (it->second.timer < it->second.delay) {
it++;
continue;
}
const Contact3Handle c {_cr, it->first};
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
// peer nolonger online
it = _request_queue.erase(it);
continue;
}
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
// now in sec
const uint64_t ts_now = Message::getTimeMS()/1000;
const uint64_t ts_start = ts_now;
const uint64_t ts_end = ts_now-(60*60*48);
if (sendRequest(group_number, peer_number, ts_start, ts_end)) {
// TODO: requeue
// TODO: segment
// TODO: dont request already received ranges
//// on success, requeue with longer delay (minutes)
//it->second.timer = 0.f;
//it->second.delay = _delay_next_request_min + _rng_dist(_rng)*_delay_next_request_add;
//// double the delay for overlap (9m-15m)
//// TODO: finetune
//it->second.sync_delta = uint8_t((it->second.delay/60.f)*2.f) + 1;
//std::cout << "ZOX #### requeued request in " << it->second.delay << "s\n";
auto& rcl = c.get_or_emplace<Components::RequestedChatLogs>();
rcl.addRequest(ts_start, ts_end);
} else {
// on failure, assume disconnected
}
// remove from request queue
it = _request_queue.erase(it);
}
return 1000.f;
}
bool NGCHS2Rizzler::sendRequest(
uint32_t group_number, uint32_t peer_number,
uint64_t ts_start, uint64_t ts_end
) {
std::cout << "NGCHS2Rizzler: sending request to " << group_number << ":" << peer_number << " (" << ts_start << "," << ts_end << ")\n";
// build fid
std::vector<uint8_t> fid;
fid.reserve(sizeof(uint64_t)+sizeof(uint64_t));
serlSimpleType(fid, ts_start);
serlSimpleType(fid, ts_end);
assert(fid.size() == sizeof(uint64_t)+sizeof(uint64_t));
return _nft.NGC_FT1_send_request_private(
group_number, peer_number,
(uint32_t)NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK,
fid.data(), fid.size() // fid
);
}
void NGCHS2Rizzler::handleMsgPack(Contact3Handle sync_by_c, const std::vector<uint8_t>& data) {
assert(sync_by_c);
auto* reg_ptr = _rmm.get(sync_by_c);
if (reg_ptr == nullptr) {
std::cerr << "NGCHS2Rizzler error: group without msg reg\n";
return;
}
Message3Registry& reg = *reg_ptr;
uint64_t now_ts = Message::getTimeMS();
std::cout << "NGCHS2Rizzler: start parsing msgpack chatlog from " << entt::to_integral(sync_by_c.entity()) << "\n";
try {
const auto j = nlohmann::json::from_msgpack(data);
if (!j.is_array()) {
std::cerr << "NGCHS2Rizzler error: chatlog not array\n";
return;
}
std::cout << "NGCHS2Rizzler: chatlog has " << j.size() << " entries\n";
for (const auto j_entry : j) {
try {
// deci seconds
uint64_t ts = j_entry.at("ts");
// TODO: check against ts range
ts *= 100; // convert to ms
const auto& j_ppk = j_entry.at("ppk");
uint32_t mid = j_entry.at("mid");
if (
!(j_entry.count("text")) &&
!(j_entry.count("fkind") && j_entry.count("fid"))
) {
std::cerr << "NGCHS2Rizzler error: msg neither contains text nor file fields\n";
continue;
}
Contact3 from_c{entt::null};
{ // from_c
std::vector<uint8_t> id;
if (j_ppk.is_binary()) {
id = j_ppk.get_binary();
} else {
j_ppk.at("bytes").get_to(id);
}
from_c = findContactByID(_cr, id);
if (!_cr.valid(from_c)) {
// create sparse contact with id only
from_c = _cr.create();
_cr.emplace_or_replace<Contact::Components::ID>(from_c, id);
// TODO: only if public message
_cr.emplace_or_replace<Contact::Components::Parent>(from_c, sync_by_c.get<Contact::Components::Parent>().parent);
}
}
// TODO: from_c perm check
// hard to do without numbers
Message3Handle new_real_msg{reg, reg.create()};
new_real_msg.emplace<Message::Components::Timestamp>(ts); // reactive?
new_real_msg.emplace<Message::Components::ContactFrom>(from_c);
new_real_msg.emplace<Message::Components::ContactTo>(sync_by_c.get<Contact::Components::Parent>().parent);
new_real_msg.emplace<Message::Components::ToxGroupMessageID>(mid);
if (j_entry.contains("action") && static_cast<bool>(j_entry.at("action"))) {
new_real_msg.emplace<Message::Components::TagMessageIsAction>();
}
if (j_entry.contains("text")) {
const std::string& text = j_entry.at("text");
new_real_msg.emplace<Message::Components::MessageText>(text);
#if 0
std::cout
<< "msg ts:" << ts
//<< " ppk:" << j_ppk
<< " mid:" << mid
<< " type:" << type
<< " text:" << text
<< "\n"
;
#endif
} else if (j_entry.contains("fkind") && j_entry.contains("fid")) {
uint32_t fkind = j_entry.at("fkind");
const auto& j_fid = j_entry.at("fid");
std::vector<uint8_t> fid;
if (j_fid.is_binary()) {
fid = j_fid.get_binary();
} else {
j_fid.at("bytes").get_to(fid);
}
if (fkind == (uint32_t)NGCFT1_file_kind::HASH_SHA1_INFO) {
_sha1_nft.constructFileMessageInPlace(
new_real_msg,
NGCFT1_file_kind::HASH_SHA1_INFO,
ByteSpan{fid}
);
} else {
std::cerr << "NGCHS2Rizzler error: unknown file kind " << fkind << "\n";
}
#if 0
std::cout
<< "msg ts:" << ts
//<< " ppk:" << j_ppk
<< " mid:" << mid
<< " type:" << type
<< " fkind:" << fkind
<< " fid:" << j_fid
<< "\n"
;
#endif
}
// now check against pre existing
// TODO: dont do this afterwards
Message3Handle dup_msg{};
{ // check preexisting
// get comparator from contact
const Contact3Handle reg_c {_cr, reg.ctx().get<Contact3>()};
if (reg_c.all_of<Contact::Components::MessageIsSame>()) {
auto& comp = reg_c.get<Contact::Components::MessageIsSame>().comp;
// walking EVERY existing message OOF
// this needs optimizing
for (const Message3 other_msg : reg.view<Message::Components::Timestamp, Message::Components::ContactFrom, Message::Components::ContactTo>()) {
if (other_msg == new_real_msg) {
continue; // skip self
}
if (comp({reg, other_msg}, new_real_msg)) {
// dup
dup_msg = {reg, other_msg};
break;
}
}
} // else, default heuristic??
}
Message3Handle new_msg = new_real_msg;
if (dup_msg) {
// we leak objects here (if file)
reg.destroy(new_msg);
new_msg = dup_msg;
}
{ // by whom
auto& synced_by = new_msg.get_or_emplace<Message::Components::SyncedBy>().ts;
// dont overwrite
synced_by.try_emplace(sync_by_c, now_ts);
}
{ // now we also know they got the message
auto& list = new_msg.get_or_emplace<Message::Components::ReceivedBy>().ts;
// dont overwrite
list.try_emplace(sync_by_c, now_ts);
}
if (new_msg == dup_msg) {
// TODO: maybe update a timestamp?
_rmm.throwEventUpdate(reg, new_msg);
} else {
// pure new msg
new_msg.emplace<Message::Components::TimestampProcessed>(now_ts);
new_msg.emplace<Message::Components::TimestampWritten>(ts);
new_msg.emplace<Message::Components::TagUnread>();
_rmm.throwEventConstruct(reg, new_msg);
}
} catch (...) {
std::cerr << "NGCHS2Rizzler error: parsing entry '" << j_entry.dump() << "'\n";
}
}
} catch (...) {
std::cerr << "NGCHS2Rizzler error: failed parsing data as msgpack\n";
}
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_init& e) {
if (e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) {
return false; // not for us
}
std::cout << "NGCHS2Rizzler: recv_init " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false; // huh?
}
if (!c.all_of<Components::RequestedChatLogs>()) {
return false;
}
// parse start end
// TODO: extract
ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
std::cerr << "NGCHS2S error: range not lange enough\n";
return true;
}
// seconds
uint64_t ts_start{0};
uint64_t ts_end{0};
// parse
try {
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
ts_start = deserlTS(ts_start_bytes);
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
ts_end = deserlTS(ts_end_bytes);
} catch (...) {
std::cerr << "NGCHS2R error: failed to parse range\n";
return true;
}
if (ts_end >= ts_start) {
std::cerr << "NGCHS2R error: end not < start\n";
return true;
}
auto& reqcl = c.get<Components::RequestedChatLogs>();
if (!reqcl.contains(ts_start, ts_end)) {
// warn?
return true;
}
auto& rnncl = c.get_or_emplace<Components::RunningChatLogs>();
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
auto& transfer = rnncl.list[e.transfer_id];
transfer.data.reserve(e.file_size); // danger?
transfer.last_activity = 0.f;
transfer.ts_start = ts_start;
transfer.ts_end = ts_end;
e.accept = true;
return true;
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_data& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false;
}
if (!c.all_of<Components::RunningChatLogs>()) {
return false; // not ours
}
auto& rnncl = c.get<Components::RunningChatLogs>();
if (!rnncl.list.count(e.transfer_id)) {
return false; // not ours
}
std::cout << "NGCHS2Rizzler: recv_data " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << " " << e.data_size << "@" << e.data_offset << "\n";
auto& transfer = rnncl.list.at(e.transfer_id);
transfer.data.resize(e.data_offset+e.data_size);
std::memcpy(&transfer.data[e.data_offset], e.data, e.data_size);
transfer.last_activity = 0.f;
return true;
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_done& e) {
// FIXME: this does not work, tcm just delteded the relation ship
//auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
//if (!c) {
// return false;
//}
const auto c_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
if (c_it == _tox_peer_to_contact.end()) {
return false;
}
auto c = c_it->second;
if (!static_cast<bool>(c)) {
return false;
}
if (!c.all_of<Components::RunningChatLogs>()) {
return false; // not ours
}
auto& rnncl = c.get<Components::RunningChatLogs>();
if (!rnncl.list.count(e.transfer_id)) {
return false; // not ours
}
std::cout << "NGCHS2Rizzler: recv_done " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
{
auto& transfer = rnncl.list.at(e.transfer_id);
// TODO: done might mean failed, so we might be parsing bs here
// use data
// TODO: move out of packet handler
handleMsgPack(c, transfer.data);
}
rnncl.list.erase(e.transfer_id);
return true;
}
bool NGCHS2Rizzler::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
const auto group_number = tox_event_group_peer_join_get_group_number(e);
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
if (!c) {
return false;
}
if (!_request_queue.count(c)) {
_request_queue[c] = {
_delay_before_first_request_min + _rng_dist(_rng)*_delay_before_first_request_add,
0.f,
0,
};
}
return false;
}

View File

@ -0,0 +1,73 @@
#pragma once
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/toxcore/tox_event_interface.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp>
// fwd
class ToxContactModel2;
class RegistryMessageModelI;
class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
Contact3Registry& _cr;
RegistryMessageModelI& _rmm;
ToxContactModel2& _tcm;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
ToxEventProviderI::SubscriptionReference _tep_sr;
SHA1_NGCFT1& _sha1_nft;
// 5s-6s
const float _delay_before_first_request_min {5.f};
const float _delay_before_first_request_add {1.f};
std::uniform_real_distribution<float> _rng_dist {0.0f, 1.0f};
std::minstd_rand _rng;
struct RequestQueueInfo {
float delay; // const
float timer;
uint64_t sync_delta; //?
};
// request queue
// c -> delay, timer
std::map<Contact3, RequestQueueInfo> _request_queue;
// FIXME: workaround missing contact events
// only used on peer exit (no, also used to quicken lookups)
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
public:
NGCHS2Rizzler(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft,
ToxEventProviderI& tep,
SHA1_NGCFT1& sha1_nft
);
~NGCHS2Rizzler(void);
float iterate(float delta);
protected:
bool sendRequest(
uint32_t group_number, uint32_t peer_number,
uint64_t ts_start, uint64_t ts_end
);
void handleMsgPack(Contact3Handle c, const std::vector<uint8_t>& data);
protected:
bool onEvent(const Events::NGCFT1_recv_init&) override;
bool onEvent(const Events::NGCFT1_recv_data&) override;
bool onEvent(const Events::NGCFT1_recv_done&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
};

View File

@ -0,0 +1,460 @@
#include "./ngc_hs2_sigma.hpp"
#include <solanaceae/util/span.hpp>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
#include <solanaceae/contact/components.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/message3/components.hpp>
#include <solanaceae/tox_messages/msg_components.hpp>
//#include <solanaceae/tox_messages/obj_components.hpp>
// TODO: this is kinda bad, needs improvement
// use tox fileid/filekind instead !
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
#include <solanaceae/ngc_ft1_sha1/components.hpp>
// TODO: move somewhere else?
#include <solanaceae/ngc_ft1_sha1/util.hpp>
#include <nlohmann/json.hpp>
#include "./serl.hpp"
#include "./ts_find_start.hpp"
#include <iostream>
// https://www.youtube.com/watch?v=AdAqsgga3qo
// TODO: move to own file
namespace Components {
struct IncommingTimeRangeRequestQueue {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> fid;
};
std::deque<Entry> _queue;
// we should remove/notadd queued requests
// that are subsets of same or larger ranges
void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid);
};
struct IncommingTimeRangeRequestRunning {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> data; // transfer data in memory
float last_activity {0.f};
};
entt::dense_map<uint8_t, Entry> _list;
};
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) {
// TODO: do more than exact dedupe
for (const auto& [time_range, _] : _queue) {
if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) {
return; // already enqueued
// TODO: what about fid?
}
}
_queue.emplace_back(Entry{
new_request,
std::vector<uint8_t>{fid.cbegin(), fid.cend()}
});
}
} // Components
NGCHS2Sigma::NGCHS2Sigma(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft
) :
_cr(cr),
_rmm(rmm),
_tcm(tcm),
_nft(nft),
_nftep_sr(_nft.newSubRef(this))
{
_nftep_sr
.subscribe(NGCFT1_Event::recv_request)
.subscribe(NGCFT1_Event::send_data)
.subscribe(NGCFT1_Event::send_done)
;
}
NGCHS2Sigma::~NGCHS2Sigma(void) {
}
float NGCHS2Sigma::iterate(float delta) {
// limit how often we update here (new fts usually)
if (_iterate_heat > 0.f) {
_iterate_heat -= delta;
return 1000.f; // return heat?
} else {
_iterate_heat = _iterate_cooldown;
}
// work request queue
// check if already running, discard
auto fn_iirq = [this](auto&& view) {
for (auto&& [cv, iirq] : view.each()) {
if (iirq._queue.empty()) {
// TODO: remove comp?
continue;
}
Contact3Handle c{_cr, cv};
auto& iirr = c.get_or_emplace<Components::IncommingTimeRangeRequestRunning>();
// dedup queued from running
if (iirr._list.size() >= _max_parallel_per_peer) {
continue;
}
// new ft here
// TODO: loop? nah just 1 per tick is enough
const auto request_entry = iirq._queue.front(); // copy
assert(!request_entry.fid.empty());
if (!c.all_of<Contact::Components::Parent>()) {
iirq._queue.pop_front();
continue; // how
}
const Contact3Handle group_c = {*c.registry(), c.get<Contact::Components::Parent>().parent};
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
iirq._queue.pop_front();
continue;
}
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
_tox_peer_to_contact[combine_ids(group_number, peer_number)] = c; // cache
// TODO: check allowed range here
//_max_time_into_past_default
// potentially heavy op
auto data = buildChatLogFileRange(group_c, request_entry.ir.ts_start, request_entry.ir.ts_end);
uint8_t transfer_id {0};
if (!_nft.NGC_FT1_send_init_private(
group_number, peer_number,
(uint32_t)NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK,
request_entry.fid.data(), request_entry.fid.size(),
data.size(),
&transfer_id,
true // can_compress (does nothing rn)
)) {
// sending failed, we do not pop but wait for next iterate
// TODO: cache data
// TODO: fail counter
// actually, fail probably means offline, so delete?
continue;
}
assert(iirr._list.count(transfer_id) == 0);
iirr._list[transfer_id] = {request_entry.ir, data};
iirq._queue.pop_front();
}
};
// first handle range requests on weak self
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue, Contact::Components::TagSelfWeak>());
// we could stop here, if too much is already running
// then range on others
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
_cr.view<Components::IncommingTimeRangeRequestRunning>().each(
[delta](const auto cv, Components::IncommingTimeRangeRequestRunning& irr) {
std::vector<uint8_t> to_remove;
for (auto&& [ft_id, entry] : irr._list) {
entry.last_activity += delta;
if (entry.last_activity >= 60.f) {
to_remove.push_back(ft_id);
}
}
for (const auto it : to_remove) {
std::cout << "NGCHS2Sigma warning: timed out ." << (int)it << "\n";
// TODO: need a way to tell ft?
irr._list.erase(it);
// technically we are not supposed to timeout and instead rely on the done event
}
}
);
return 1000.f;
}
void NGCHS2Sigma::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
std::cerr << "NGCHS2S error: range not lange enough\n";
return;
}
// seconds
uint64_t ts_start{0};
uint64_t ts_end{0};
// parse
try {
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
ts_start = deserlTS(ts_start_bytes);
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
ts_end = deserlTS(ts_end_bytes);
} catch (...) {
std::cerr << "NGCHS2S error: failed to parse range\n";
return;
}
if (ts_end >= ts_start) {
std::cerr << "NGCHS2S error: end not < start\n";
return;
}
// dedupe insert into queue
// how much overlap do we allow?
c.get_or_emplace<Components::IncommingTimeRangeRequestQueue>().queueRequest(
{ts_start, ts_end},
fid
);
}
std::vector<uint8_t> NGCHS2Sigma::buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) {
const Message3Registry* reg_ptr = static_cast<const RegistryMessageModelI&>(_rmm).get(c);
if (reg_ptr == nullptr) {
return {};
}
const Message3Registry& msg_reg = *reg_ptr;
if (msg_reg.storage<Message::Components::Timestamp>() == nullptr) {
// nothing to do here
return {};
}
std::cout << "NGCHS2Sigma: building chatlog for time range " << ts_start-ts_end << "s\n";
// convert seconds to milliseconds
// TODO: lift out?
ts_start *= 1000;
ts_end *= 1000;
//std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << " ts_end:" << ts_end << "\n";
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
// we iterate "forward", so from newest to oldest
// start is the newest ts
const auto ts_start_it = find_start_by_ts(ts_view, ts_start);
// end is the oldest ts
// we only search for the start point, because we walk to the end anyway
auto j_array = nlohmann::json::array_t{};
// hmm
// maybe use other view or something?
for (auto it = ts_start_it; it != ts_view.end(); it++) {
const auto e = *it;
const auto& [ts_comp] = ts_view.get(e);
if (ts_comp.ts > ts_start) {
std::cerr << "!!!! msg ent in view too new\n";
continue;
} else if (ts_comp.ts < ts_end) {
// too old, we hit the end of the range
break;
}
if (!msg_reg.all_of<
Message::Components::ContactFrom,
Message::Components::ContactTo,
Message::Components::ToxGroupMessageID
>(e)) {
continue; // ??
}
if (!msg_reg.any_of<Message::Components::MessageText, Message::Components::MessageFileObject>(e)) {
continue; // skip
}
const auto& [c_from_c, c_to_c] = msg_reg.get<Message::Components::ContactFrom, Message::Components::ContactTo>(e);
if (c_to_c.c != c) {
// message was not public
continue;
}
if (!_cr.valid(c_from_c.c)) {
continue; // ???
}
Contact3Handle c_from{_cr, c_from_c.c};
if (!c_from.all_of<Contact::Components::ToxGroupPeerPersistent>()) {
continue; // ???
}
if (_only_send_self_observed && msg_reg.all_of<Message::Components::SyncedBy>(e) && c.all_of<Contact::Components::Self>()) {
if (!msg_reg.get<Message::Components::SyncedBy>(e).ts.count(c.get<Contact::Components::Self>().self)) {
continue; // did not observe ourselfs, skip
}
}
auto j_entry = nlohmann::json::object_t{};
j_entry["ts"] = ts_comp.ts/100; // millisec -> decisec
{
const auto& ppk_ref = c_from.get<Contact::Components::ToxGroupPeerPersistent>().peer_key.data;
j_entry["ppk"] = nlohmann::json::binary_t{std::vector<uint8_t>{ppk_ref.cbegin(), ppk_ref.cend()}};
}
j_entry["mid"] = msg_reg.get<Message::Components::ToxGroupMessageID>(e).id;
if (msg_reg.all_of<Message::Components::TagMessageIsAction>(e)) {
j_entry["action"] = true;
}
if (msg_reg.all_of<Message::Components::MessageText>(e)) {
j_entry["text"] = msg_reg.get<Message::Components::MessageText>(e).text;
} else if (msg_reg.any_of<Message::Components::MessageFileObject>(e)) {
const auto& o = msg_reg.get<Message::Components::MessageFileObject>(e).o;
if (!o) {
continue;
}
// HACK: use tox fild_id and file_kind instead!!
if (o.all_of<Components::FT1InfoSHA1Hash>()) {
j_entry["fkind"] = NGCFT1_file_kind::HASH_SHA1_INFO;
j_entry["fid"] = nlohmann::json::binary_t{o.get<Components::FT1InfoSHA1Hash>().hash};
} else {
continue; // unknown file type
}
}
j_array.push_back(j_entry);
}
std::cout << "NGCHS2Sigma: built chat log with " << j_array.size() << " entries\n";
return nlohmann::json::to_msgpack(j_array);
}
bool NGCHS2Sigma::onEvent(const Message::Events::MessageConstruct&) {
return false;
}
bool NGCHS2Sigma::onEvent(const Message::Events::MessageUpdated&) {
return false;
}
bool NGCHS2Sigma::onEvent(const Message::Events::MessageDestory&) {
return false;
}
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_recv_request& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK
) {
return false; // not for us
}
// TODO: when is it done from queue?
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false; // how
}
// is other peer allowed to make requests
//bool quick_allow {false};
bool quick_allow {true}; // HACK: disable all restrictions for this early test
// TODO: quick deny?
{
// - tagged as weakself
if (!quick_allow && c.all_of<Contact::Components::TagSelfWeak>()) {
quick_allow = true;
}
// - sub perm level??
// - out of max time range (ft specific, not a quick_allow)
}
if (e.file_kind == NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) {
handleTimeRange(c, e);
}
return true;
}
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_data& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false;
}
if (!c.all_of<Components::IncommingTimeRangeRequestRunning>()) {
return false;
}
auto& irr = c.get<Components::IncommingTimeRangeRequestRunning>();
if (!irr._list.count(e.transfer_id)) {
return false; // not for us (maybe)
}
auto& transfer = irr._list.at(e.transfer_id);
if (transfer.data.size() < e.data_offset+e.data_size) {
std::cerr << "NGCHS2Sigma error: ft send data larger then file???\n";
assert(false && "how");
}
std::memcpy(e.data, transfer.data.data()+e.data_offset, e.data_size);
transfer.last_activity = 0.f;
return true;
}
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_done& e) {
// TODO: this will return null if the peer just disconnected
// FIXME: this does not work, tcm just delteded the relation ship
//auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
//if (!c) {
// return false;
//}
const auto c_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
if (c_it == _tox_peer_to_contact.end()) {
return false;
}
auto c = c_it->second;
if (!static_cast<bool>(c)) {
return false;
}
if (!c.all_of<Components::IncommingTimeRangeRequestRunning>()) {
return false;
}
auto& irr = c.get<Components::IncommingTimeRangeRequestRunning>();
if (!irr._list.count(e.transfer_id)) {
return false; // not for us (maybe)
}
irr._list.erase(e.transfer_id);
// TODO: check if we completed it
std::cout << "NGCHS2Sigma: sent chatlog to " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
return true;
}

View File

@ -0,0 +1,82 @@
#pragma once
#include <solanaceae/toxcore/tox_event_interface.hpp>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <entt/container/dense_map.hpp>
#include <solanaceae/util/span.hpp>
#include <vector>
#include <deque>
// fwd
class ToxContactModel2;
struct TimeRangeRequest {
uint64_t ts_start{0};
uint64_t ts_end{0};
};
class NGCHS2Sigma : public RegistryMessageModelEventI, public NGCFT1EventI {
Contact3Registry& _cr;
RegistryMessageModelI& _rmm;
ToxContactModel2& _tcm;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
float _iterate_heat {0.f};
constexpr static float _iterate_cooldown {1.22f}; // sec
// open/running range requests (by c)
// comp on peer c
// open/running range responses (by c)
// comp on peer c
// limit to 2 uploads per peer simultaniously
// TODO: increase for prod (4?) or maybe even lower?
// currently per type
constexpr static size_t _max_parallel_per_peer {2};
constexpr static bool _only_send_self_observed {true};
constexpr static int64_t _max_time_into_past_default {60*15}; // s
// FIXME: workaround missing contact events
// only used on peer exit (no, also used to quicken lookups)
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
public:
NGCHS2Sigma(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft
);
~NGCHS2Sigma(void);
float iterate(float delta);
void handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request&);
// msg reg contact
// time ranges
[[nodiscard]] std::vector<uint8_t> buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end);
protected:
bool onEvent(const Message::Events::MessageConstruct&) override;
bool onEvent(const Message::Events::MessageUpdated&) override;
bool onEvent(const Message::Events::MessageDestory&) override;
protected:
bool onEvent(const Events::NGCFT1_recv_request&) override;
bool onEvent(const Events::NGCFT1_send_data&) override;
bool onEvent(const Events::NGCFT1_send_done&) override;
};

View File

@ -0,0 +1,32 @@
#pragma once
#include <solanaceae/util/span.hpp>
#include <cstdint>
template<typename Type>
static uint64_t deserlSimpleType(ByteSpan bytes) {
if (bytes.size < sizeof(Type)) {
throw int(1);
}
Type value{};
for (size_t i = 0; i < sizeof(Type); i++) {
value |= Type(bytes[i]) << (i*8);
}
return value;
}
static uint64_t deserlTS(ByteSpan ts_bytes) {
return deserlSimpleType<uint64_t>(ts_bytes);
}
template<typename Type>
static void serlSimpleType(std::vector<uint8_t>& bytes, const Type& value) {
for (size_t i = 0; i < sizeof(Type); i++) {
bytes.push_back(uint8_t(value >> (i*8) & 0xff));
}
}

View File

@ -0,0 +1,110 @@
# [NGC] Group-History-Sync (v2.1) [PoC] [Draft]
Simple group history sync that uses `timestamp` + `peer public key` + `message_id` (`ts+ppk+mid`) to, mostly, uniquely identify messages and deliver them.
Messages are bundled up in a `msgpack` `array` and sent as a file transfer.
## Requirements
TODO: more?
### Msgpack
For serializing the messages.
### File transfers
For sending packs of messages.
Even a single message can be larger than a single custom packet, so this is a must-have.
This also allows for compression down the road.
## Procedure
Peer A can request `ts+ppk+mid+msg` list for a given time range from peer B.
Peer B then sends a filetransfer (with special file type) of list of `ts+ppk+mid+msg`.
Optionally compressed. (Delta-coding? / zstd)
Peer A keeps doing that until the desired time span is covered.
During all that, peer B usually does the same thing to peer A.
TODO: deny request explicitly. also why (like perms and time range too large etc)
## Traffic savings
It is recomended to remember if a range has been requested and answered from a given peer, to reduce traffic.
While compression is optional, it is recommended.
Timestamps fit delta coding.
Peer keys fit dicts.
Message ids are mostly high entropy.
The Message itself is text, so dict/huffman fits well.
TODO: store the 4 coloms SoA instead of AoS ?
## Message uniqueness
This protocol relies on the randomness of `message_id` and the clocks to be more or less synchronized.
However, `message_id` can be manipulated freely by any peer, this can make messages appear as duplicates.
This can be used here, if you don't wish your messages to be syncronized (to an extent).
## Security
Only sync publicly sent/recieved messages.
Only allow sync or extended time ranges from peers you trust (enough).
The default shall be to not offer any messages.
Indirect messages shall be low in credibility, while direct synced (by author), with mid credibility.
Either only high or mid credibility shall be sent.
Manual exceptions to all can be made at the users discretion, eg for other self owned devices.
## File transfer requests
TODO: is reusing the ft request api a good idea for this?
| fttype | name | content (ft id) |
|------------|------|---------------------|
| 0x00000f02 | time range msgpack | - ts start </br> - ts end |
## File transfer content
| fttype | name | content | note |
|------------|------|----------------------------|---|
| 0x00000f02 | time range msgpack | `message list` in msgpack | |
### time range msgpack
Msgpack array of messages.
```
name | type/size | note
-------------------------|-------------------|-----
- array | 32bit number msgs
- ts | 64bit deciseconds
- ppk | 32bytes
- mid | 16bit
- if action |
- action | bool
- if text |
- text | string | maybe byte array instead?
- if file |
- fkind | 32bit enum | is this right?
- fid | bytes kind | length depends on kind
```
Name is the actual string key.
Data type sizes are suggestions, if not defined by the tox protocol.
## TODO
- [ ] figure out a pro-active approach (instead of waiting for a range request)
- [ ] compression in the ft layer? (would make it reusable) hint/autodetect/autoenable for >1k ?

View File

@ -0,0 +1,82 @@
#include "./ts_find_start.hpp"
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/message3/components.hpp>
#include <cassert>
int main(void) {
Message3Registry msg_reg;
{
std::cout << "TEST empty reg\n";
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
const auto res = find_start_by_ts(ts_view, 42);
assert(res == ts_view.end());
}
{
std::cout << "TEST single msg newer (fail)\n";
Message3Handle msg{msg_reg, msg_reg.create()};
msg.emplace<Message::Components::Timestamp>(43ul);
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
const auto res = find_start_by_ts(ts_view, 42);
assert(res == ts_view.end());
msg.destroy();
}
{
std::cout << "TEST single msg same (succ)\n";
Message3Handle msg{msg_reg, msg_reg.create()};
msg.emplace<Message::Components::Timestamp>(42ul);
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
const auto res = find_start_by_ts(ts_view, 42);
assert(res != ts_view.end());
msg.destroy();
}
{
std::cout << "TEST single msg older (succ)\n";
Message3Handle msg{msg_reg, msg_reg.create()};
msg.emplace<Message::Components::Timestamp>(41ul);
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
const auto res = find_start_by_ts(ts_view, 42);
assert(res != ts_view.end());
msg.destroy();
}
{
std::cout << "TEST multi msg\n";
Message3Handle msg{msg_reg, msg_reg.create()};
msg.emplace<Message::Components::Timestamp>(41ul);
Message3Handle msg2{msg_reg, msg_reg.create()};
msg2.emplace<Message::Components::Timestamp>(42ul);
Message3Handle msg3{msg_reg, msg_reg.create()};
msg3.emplace<Message::Components::Timestamp>(43ul);
// see message3/message_time_sort.cpp
msg_reg.sort<Message::Components::Timestamp>([](const auto& lhs, const auto& rhs) -> bool {
return lhs.ts > rhs.ts;
}, entt::insertion_sort{});
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
auto res = find_start_by_ts(ts_view, 42);
assert(res != ts_view.end());
assert(*res == msg2);
res++;
assert(*res == msg);
msg3.destroy();
msg2.destroy();
msg.destroy();
}
return 0;
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <algorithm>
#include <cstdint>
#include <iostream>
// perform binary search to find the first message not newer than ts_start
template<typename View>
auto find_start_by_ts(const View& view, uint64_t ts_start) {
//std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << "\n";
// -> first value smaller than start ts
auto res = std::lower_bound(
view.begin(), view.end(),
ts_start,
[&view](const auto& a, const auto& b) {
const auto& [a_comp] = view.get(a);
return a_comp.ts > b; // > bc ts is sorted high to low?
}
);
if (res != view.end()) {
const auto& [ts_comp] = view.get(*res);
//std::cout << "!!!! first value not newer than start ts is " << ts_comp.ts << "\n";
} else {
//std::cout << "!!!! no first value not newer than start ts\n";
}
return res;
}