Compare commits
29 Commits
ignore_tim
...
master
Author | SHA1 | Date | |
---|---|---|---|
|
7cd68845ca | ||
|
f40907d42a | ||
|
6d7d643207 | ||
|
b35babe3f8 | ||
|
78390dd342 | ||
|
eb169b2779 | ||
|
100279a483 | ||
|
60b3d5d941 | ||
|
6ad2905e07 | ||
|
930c829031 | ||
|
a139f412b1 | ||
|
abdf6672bf | ||
|
7bbaa9b929 | ||
|
70620a901b | ||
|
231928339e | ||
|
294c5346ca | ||
|
adeaca4efe | ||
|
ba809eda43 | ||
|
b9a7c75d20 | ||
|
04b6f7925a | ||
|
5601ad91f5 | ||
|
741f1428d3 | ||
|
34dc01d4dc | ||
|
1bf1fbce75 | ||
37b92f67c8 | |||
01c892df8c | |||
6eb5826616 | |||
2e6b15e4ad | |||
63de78aaeb |
@ -1,9 +1,21 @@
|
||||
cmake_minimum_required(VERSION 3.9 FATAL_ERROR)
|
||||
cmake_minimum_required(VERSION 3.24 FATAL_ERROR)
|
||||
|
||||
add_subdirectory(./external)
|
||||
|
||||
project(solanaceae)
|
||||
|
||||
if (CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR)
|
||||
set(SOLANACEAE_NGCFT1_STANDALONE ON)
|
||||
else()
|
||||
set(SOLANACEAE_NGCFT1_STANDALONE OFF)
|
||||
endif()
|
||||
message("II SOLANACEAE_NGCFT1_STANDALONE " ${SOLANACEAE_NGCFT1_STANDALONE})
|
||||
|
||||
option(SOLANACEAE_NGCFT1_BUILD_PLUGINS "Build the solanaceae_ngcft1 plugins" ${SOLANACEAE_NGCFT1_BUILD_PLUGINS})
|
||||
|
||||
# TODO: move this stuff to src
|
||||
########################################
|
||||
|
||||
add_library(solanaceae_ngcext
|
||||
./solanaceae/ngc_ext/ngcext.hpp
|
||||
./solanaceae/ngc_ext/ngcext.cpp
|
||||
@ -43,21 +55,6 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC
|
||||
|
||||
########################################
|
||||
|
||||
add_library(solanaceae_ngchs2
|
||||
./solanaceae/ngc_hs2/ngc_hs2.hpp
|
||||
./solanaceae/ngc_hs2/ngc_hs2.cpp
|
||||
)
|
||||
target_include_directories(solanaceae_ngchs2 PUBLIC .)
|
||||
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
|
||||
target_link_libraries(solanaceae_ngchs2 PUBLIC
|
||||
solanaceae_ngcft1
|
||||
solanaceae_tox_contacts
|
||||
solanaceae_message3
|
||||
solanaceae_object_store
|
||||
)
|
||||
|
||||
########################################
|
||||
|
||||
add_library(solanaceae_sha1_ngcft1
|
||||
# hacky deps
|
||||
./solanaceae/ngc_ft1_sha1/mio.hpp
|
||||
@ -117,8 +114,6 @@ target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC
|
||||
solanaceae_file2
|
||||
)
|
||||
|
||||
########################################
|
||||
|
||||
option(SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING "Build the solanaceae_ngcft1_sha1 tests" OFF)
|
||||
message("II SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING " ${SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING})
|
||||
|
||||
@ -136,3 +131,51 @@ if (SOLANACEAE_NGCFT1_SHA1_BUILD_TESTING)
|
||||
|
||||
endif()
|
||||
|
||||
########################################
|
||||
|
||||
add_library(solanaceae_ngchs2
|
||||
./solanaceae/ngc_hs2/serl.hpp
|
||||
|
||||
./solanaceae/ngc_hs2/ts_find_start.hpp
|
||||
|
||||
./solanaceae/ngc_hs2/ngc_hs2_sigma.hpp
|
||||
./solanaceae/ngc_hs2/ngc_hs2_sigma.cpp
|
||||
|
||||
./solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp
|
||||
./solanaceae/ngc_hs2/ngc_hs2_rizzler.cpp
|
||||
)
|
||||
target_include_directories(solanaceae_ngchs2 PUBLIC .)
|
||||
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
|
||||
target_link_libraries(solanaceae_ngchs2 PUBLIC
|
||||
solanaceae_ngcft1
|
||||
solanaceae_sha1_ngcft1 # HACK: properly abstract filekind/-id
|
||||
solanaceae_tox_contacts
|
||||
solanaceae_message3
|
||||
solanaceae_object_store
|
||||
nlohmann_json::nlohmann_json
|
||||
)
|
||||
|
||||
option(SOLANACEAE_NGCHS2_BUILD_TESTING "Build the solanaceae_ngchs2 tests" OFF)
|
||||
message("II SOLANACEAE_NGCHS2_BUILD_TESTING " ${SOLANACEAE_NGCHS2_BUILD_TESTING})
|
||||
|
||||
if (SOLANACEAE_NGCHS2_BUILD_TESTING)
|
||||
include(CTest)
|
||||
|
||||
add_executable(test_hs2_ts_binarysearch
|
||||
./solanaceae/ngc_hs2/test_ts_binarysearch.cpp
|
||||
)
|
||||
|
||||
target_link_libraries(test_hs2_ts_binarysearch PUBLIC
|
||||
solanaceae_ngchs2
|
||||
)
|
||||
|
||||
add_test(NAME test_hs2_ts_binarysearch COMMAND test_hs2_ts_binarysearch)
|
||||
|
||||
endif()
|
||||
|
||||
########################################
|
||||
|
||||
if (SOLANACEAE_NGCFT1_BUILD_PLUGINS)
|
||||
add_subdirectory(./plugins)
|
||||
endif()
|
||||
|
||||
|
10
external/CMakeLists.txt
vendored
10
external/CMakeLists.txt
vendored
@ -2,3 +2,13 @@ cmake_minimum_required(VERSION 3.9 FATAL_ERROR)
|
||||
|
||||
add_subdirectory(./sha1)
|
||||
|
||||
# we are running a custom msgpack serialization for hs2
|
||||
if (NOT TARGET nlohmann_json::nlohmann_json)
|
||||
FetchContent_Declare(json
|
||||
URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz
|
||||
URL_HASH SHA256=d6c65aca6b1ed68e7a182f4757257b107ae403032760ed6ef121c9d55e81757d
|
||||
EXCLUDE_FROM_ALL
|
||||
)
|
||||
FetchContent_MakeAvailable(json)
|
||||
endif()
|
||||
|
||||
|
33
plugins/CMakeLists.txt
Normal file
33
plugins/CMakeLists.txt
Normal file
@ -0,0 +1,33 @@
|
||||
cmake_minimum_required(VERSION 3.9...3.24 FATAL_ERROR)
|
||||
|
||||
########################################
|
||||
|
||||
add_library(plugin_ngcft1 MODULE
|
||||
./plugin_ngcft1.cpp
|
||||
)
|
||||
target_compile_features(plugin_ngcft1 PUBLIC cxx_std_17)
|
||||
set_target_properties(plugin_ngcft1 PROPERTIES
|
||||
C_VISIBILITY_PRESET hidden
|
||||
)
|
||||
target_compile_definitions(plugin_ngcft1 PUBLIC ENTT_API_IMPORT)
|
||||
target_link_libraries(plugin_ngcft1 PUBLIC
|
||||
solanaceae_plugin
|
||||
solanaceae_ngcext
|
||||
solanaceae_ngcft1
|
||||
solanaceae_sha1_ngcft1
|
||||
)
|
||||
|
||||
########################################
|
||||
|
||||
add_library(plugin_ngchs2 MODULE
|
||||
./plugin_ngchs2.cpp
|
||||
)
|
||||
target_compile_features(plugin_ngchs2 PUBLIC cxx_std_17)
|
||||
set_target_properties(plugin_ngchs2 PROPERTIES
|
||||
C_VISIBILITY_PRESET hidden
|
||||
)
|
||||
target_compile_definitions(plugin_ngchs2 PUBLIC ENTT_API_IMPORT)
|
||||
target_link_libraries(plugin_ngchs2 PUBLIC
|
||||
solanaceae_plugin
|
||||
solanaceae_ngchs2
|
||||
)
|
81
plugins/plugin_ngcft1.cpp
Normal file
81
plugins/plugin_ngcft1.cpp
Normal file
@ -0,0 +1,81 @@
|
||||
#include <solanaceae/plugin/solana_plugin_v1.h>
|
||||
|
||||
#include <solanaceae/ngc_ext/ngcext.hpp>
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp>
|
||||
|
||||
#include <entt/entt.hpp>
|
||||
#include <entt/fwd.hpp>
|
||||
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
|
||||
static std::unique_ptr<NGCEXTEventProvider> g_ngcextep = nullptr;
|
||||
// TODO: make sep plug
|
||||
static std::unique_ptr<NGCFT1> g_ngcft1 = nullptr;
|
||||
static std::unique_ptr<SHA1_NGCFT1> g_sha1_ngcft1 = nullptr;
|
||||
|
||||
constexpr const char* plugin_name = "NGCEXT";
|
||||
|
||||
extern "C" {
|
||||
|
||||
SOLANA_PLUGIN_EXPORT const char* solana_plugin_get_name(void) {
|
||||
return plugin_name;
|
||||
}
|
||||
|
||||
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_get_version(void) {
|
||||
return SOLANA_PLUGIN_VERSION;
|
||||
}
|
||||
|
||||
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api) {
|
||||
std::cout << "PLUGIN " << plugin_name << " START()\n";
|
||||
|
||||
if (solana_api == nullptr) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
try {
|
||||
auto* os = PLUG_RESOLVE_INSTANCE(ObjectStore2);
|
||||
auto* tox_i = PLUG_RESOLVE_INSTANCE(ToxI);
|
||||
auto* tox_event_provider_i = PLUG_RESOLVE_INSTANCE(ToxEventProviderI);
|
||||
auto* cr = PLUG_RESOLVE_INSTANCE_VERSIONED(Contact3Registry, "1");
|
||||
auto* rmm = PLUG_RESOLVE_INSTANCE(RegistryMessageModelI);
|
||||
auto* tcm = PLUG_RESOLVE_INSTANCE(ToxContactModel2);
|
||||
|
||||
// static store, could be anywhere tho
|
||||
// construct with fetched dependencies
|
||||
g_ngcextep = std::make_unique<NGCEXTEventProvider>(*tox_i, *tox_event_provider_i);
|
||||
g_ngcft1 = std::make_unique<NGCFT1>(*tox_i, *tox_event_provider_i, *g_ngcextep.get());
|
||||
g_sha1_ngcft1 = std::make_unique<SHA1_NGCFT1>(*os, *cr, *rmm, *g_ngcft1.get(), *tcm, *tox_event_provider_i, *g_ngcextep.get());
|
||||
|
||||
// register types
|
||||
PLUG_PROVIDE_INSTANCE(NGCEXTEventProviderI, plugin_name, g_ngcextep.get());
|
||||
|
||||
PLUG_PROVIDE_INSTANCE(NGCFT1EventProviderI, plugin_name, g_ngcft1.get());
|
||||
PLUG_PROVIDE_INSTANCE(NGCFT1, plugin_name, g_ngcft1.get());
|
||||
|
||||
PLUG_PROVIDE_INSTANCE(SHA1_NGCFT1, plugin_name, g_sha1_ngcft1.get());
|
||||
} catch (const ResolveException& e) {
|
||||
std::cerr << "PLUGIN " << plugin_name << " " << e.what << "\n";
|
||||
return 2;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SOLANA_PLUGIN_EXPORT void solana_plugin_stop(void) {
|
||||
std::cout << "PLUGIN " << plugin_name << " STOP()\n";
|
||||
|
||||
g_sha1_ngcft1.reset();
|
||||
g_ngcft1.reset();
|
||||
g_ngcextep.reset();
|
||||
}
|
||||
|
||||
SOLANA_PLUGIN_EXPORT float solana_plugin_tick(float delta) {
|
||||
const float ft_interval = g_ngcft1->iterate(delta);
|
||||
const float sha_interval = g_sha1_ngcft1->iterate(delta);
|
||||
return std::min<float>(ft_interval, sha_interval);
|
||||
}
|
||||
|
||||
} // extern C
|
||||
|
79
plugins/plugin_ngchs2.cpp
Normal file
79
plugins/plugin_ngchs2.cpp
Normal file
@ -0,0 +1,79 @@
|
||||
#include <solanaceae/plugin/solana_plugin_v1.h>
|
||||
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp> // this hurts
|
||||
#include <solanaceae/ngc_hs2/ngc_hs2_sigma.hpp>
|
||||
#include <solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp>
|
||||
|
||||
#include <entt/entt.hpp>
|
||||
#include <entt/fwd.hpp>
|
||||
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
|
||||
// https://youtu.be/OwT83dN82pc
|
||||
|
||||
static std::unique_ptr<NGCHS2Sigma> g_ngchs2s = nullptr;
|
||||
static std::unique_ptr<NGCHS2Rizzler> g_ngchs2r = nullptr;
|
||||
|
||||
constexpr const char* plugin_name = "NGCHS2";
|
||||
|
||||
extern "C" {
|
||||
|
||||
SOLANA_PLUGIN_EXPORT const char* solana_plugin_get_name(void) {
|
||||
return plugin_name;
|
||||
}
|
||||
|
||||
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_get_version(void) {
|
||||
return SOLANA_PLUGIN_VERSION;
|
||||
}
|
||||
|
||||
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api) {
|
||||
std::cout << "PLUGIN " << plugin_name << " START()\n";
|
||||
|
||||
if (solana_api == nullptr) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
try {
|
||||
//auto* tox_i = PLUG_RESOLVE_INSTANCE(ToxI);
|
||||
auto* tox_event_provider_i = PLUG_RESOLVE_INSTANCE(ToxEventProviderI);
|
||||
auto* cr = PLUG_RESOLVE_INSTANCE_VERSIONED(Contact3Registry, "1");
|
||||
auto* rmm = PLUG_RESOLVE_INSTANCE(RegistryMessageModelI);
|
||||
auto* tcm = PLUG_RESOLVE_INSTANCE(ToxContactModel2);
|
||||
auto* ngcft1 = PLUG_RESOLVE_INSTANCE(NGCFT1);
|
||||
auto* sha1_ngcft1 = PLUG_RESOLVE_INSTANCE(SHA1_NGCFT1);
|
||||
|
||||
// static store, could be anywhere tho
|
||||
// construct with fetched dependencies
|
||||
g_ngchs2s = std::make_unique<NGCHS2Sigma>(*cr, *rmm, *tcm, *ngcft1);
|
||||
g_ngchs2r = std::make_unique<NGCHS2Rizzler>(*cr, *rmm, *tcm, *ngcft1, *tox_event_provider_i, *sha1_ngcft1);
|
||||
|
||||
// register types
|
||||
PLUG_PROVIDE_INSTANCE(NGCHS2Sigma, plugin_name, g_ngchs2s.get());
|
||||
PLUG_PROVIDE_INSTANCE(NGCHS2Rizzler, plugin_name, g_ngchs2r.get());
|
||||
} catch (const ResolveException& e) {
|
||||
std::cerr << "PLUGIN " << plugin_name << " " << e.what << "\n";
|
||||
return 2;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SOLANA_PLUGIN_EXPORT void solana_plugin_stop(void) {
|
||||
std::cout << "PLUGIN " << plugin_name << " STOP()\n";
|
||||
|
||||
g_ngchs2r.reset();
|
||||
g_ngchs2s.reset();
|
||||
}
|
||||
|
||||
SOLANA_PLUGIN_EXPORT float solana_plugin_tick(float delta) {
|
||||
const float sigma_interval = g_ngchs2s->iterate(delta);
|
||||
const float rizzler_interval = g_ngchs2r->iterate(delta);
|
||||
return std::min<float>(sigma_interval, rizzler_interval);
|
||||
}
|
||||
|
||||
} // extern C
|
||||
|
@ -255,6 +255,7 @@ bool NGCEXTEventProvider::parse_ft1_data_ack(
|
||||
_DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false)
|
||||
e.transfer_id = data[curser++];
|
||||
|
||||
e.sequence_ids.reserve(std::max<int64_t>(data_size-curser, 1)/sizeof(uint16_t));
|
||||
while (curser < data_size) {
|
||||
_DATA_HAVE(sizeof(uint16_t), std::cerr << "NGCEXT: packet too small, missing seq_id\n"; return false)
|
||||
uint16_t seq_id = data[curser++];
|
||||
|
@ -61,10 +61,6 @@ struct CCAI {
|
||||
// returns -1 if not implemented, can return 0
|
||||
virtual int64_t inFlightBytes(void) const { return -1; }
|
||||
|
||||
// returns -1 if not implemented, can return 0
|
||||
// excluded timed out packets (not those currently resent)
|
||||
virtual int64_t inFlightBytesAccounted(void) const { return -1; }
|
||||
|
||||
public: // callbacks
|
||||
// data size is without overhead
|
||||
virtual void onSent(SeqIDType seq, size_t data_size) = 0;
|
||||
|
@ -93,8 +93,7 @@ int64_t CUBIC::canSend(float time_delta) {
|
||||
}
|
||||
|
||||
const auto window = getCWnD();
|
||||
//int64_t cspace_bytes = window - _in_flight_bytes;
|
||||
int64_t cspace_bytes = window - _in_flight_bytes_accounted;
|
||||
int64_t cspace_bytes = window - _in_flight_bytes;
|
||||
if (cspace_bytes < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||
//std::cerr << "CUBIC: cspace < seg size\n";
|
||||
return 0u;
|
||||
|
@ -7,7 +7,9 @@
|
||||
|
||||
float FlowOnly::getCurrentDelay(void) const {
|
||||
// below 1ms is useless
|
||||
return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
|
||||
//return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
|
||||
// the current iterate rate min is 5ms
|
||||
return std::clamp(_rtt_ema, 0.005f, RTT_MAX);
|
||||
}
|
||||
|
||||
void FlowOnly::addRTT(float new_delay) {
|
||||
@ -29,25 +31,6 @@ void FlowOnly::updateWindow(void) {
|
||||
_fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE);
|
||||
}
|
||||
|
||||
void FlowOnly::updateAccounted(void) {
|
||||
int64_t size_timedout {0};
|
||||
|
||||
{ // can be expensive
|
||||
// code see getTimeouts()
|
||||
// after 3 rtt delay, we trigger timeout
|
||||
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
|
||||
|
||||
for (const auto& [seq, time_stamp, size, _] : _in_flight) {
|
||||
if (now_adjusted > time_stamp) {
|
||||
//list.push_back(seq);
|
||||
size_timedout += size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_in_flight_bytes_accounted = _in_flight_bytes - size_timedout;
|
||||
}
|
||||
|
||||
void FlowOnly::updateCongestion(void) {
|
||||
updateWindow();
|
||||
const auto tmp_window = getWindow();
|
||||
@ -89,10 +72,8 @@ int64_t FlowOnly::canSend(float time_delta) {
|
||||
}
|
||||
|
||||
updateWindow();
|
||||
updateAccounted();
|
||||
|
||||
//int64_t fspace = _fwnd - _in_flight_bytes;
|
||||
int64_t fspace = _fwnd - _in_flight_bytes_accounted;
|
||||
int64_t fspace = _fwnd - _in_flight_bytes;
|
||||
if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) {
|
||||
return 0u;
|
||||
}
|
||||
@ -107,6 +88,7 @@ int64_t FlowOnly::canSend(float time_delta) {
|
||||
|
||||
std::vector<FlowOnly::SeqIDType> FlowOnly::getTimeouts(void) const {
|
||||
std::vector<SeqIDType> list;
|
||||
list.reserve(_in_flight.size()/3); // we dont know, so we just guess
|
||||
|
||||
// after 3 rtt delay, we trigger timeout
|
||||
const auto now_adjusted = getTimeNow() - getCurrentDelay()*3.f;
|
||||
@ -128,10 +110,6 @@ int64_t FlowOnly::inFlightBytes(void) const {
|
||||
return _in_flight_bytes;
|
||||
}
|
||||
|
||||
int64_t FlowOnly::inFlightBytesAccounted(void) const {
|
||||
return _in_flight_bytes_accounted;
|
||||
}
|
||||
|
||||
void FlowOnly::onSent(SeqIDType seq, size_t data_size) {
|
||||
if constexpr (true) {
|
||||
size_t sum {0u};
|
||||
|
@ -12,7 +12,7 @@ struct FlowOnly : public CCAI {
|
||||
public: // config
|
||||
static constexpr float RTT_EMA_ALPHA = 0.001f; // might need change over time
|
||||
static constexpr float RTT_UP_MAX = 3.0f; // how much larger a delay can be to be taken into account
|
||||
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
|
||||
static constexpr float RTT_MAX = 2.f; // maybe larger for tunneled connections
|
||||
|
||||
protected:
|
||||
// initialize to low value, will get corrected very fast
|
||||
@ -32,7 +32,6 @@ struct FlowOnly : public CCAI {
|
||||
};
|
||||
std::vector<FlyingBunch> _in_flight;
|
||||
int64_t _in_flight_bytes {0};
|
||||
int64_t _in_flight_bytes_accounted {0};
|
||||
|
||||
int32_t _consecutive_events {0};
|
||||
|
||||
@ -59,8 +58,6 @@ struct FlowOnly : public CCAI {
|
||||
|
||||
void updateWindow(void);
|
||||
|
||||
void updateAccounted(void);
|
||||
|
||||
virtual void onCongestion(void) {};
|
||||
|
||||
// internal logic, calls the onCongestion() event
|
||||
@ -80,7 +77,6 @@ struct FlowOnly : public CCAI {
|
||||
|
||||
int64_t inFlightCount(void) const override;
|
||||
int64_t inFlightBytes(void) const override;
|
||||
int64_t inFlightBytesAccounted(void) const override;
|
||||
|
||||
public: // callbacks
|
||||
// data size is without overhead
|
||||
|
@ -56,7 +56,9 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
timeouts_set.erase({idx, id});
|
||||
can_packet_size -= data.size();
|
||||
} else {
|
||||
#if 0 // too spammy
|
||||
std::cerr << "NGCFT1 warning: no space to resend timedout\n";
|
||||
#endif
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -169,7 +171,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
||||
}
|
||||
}
|
||||
|
||||
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
||||
bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
|
||||
if (peer.cca) {
|
||||
auto timeouts = peer.cca->getTimeouts();
|
||||
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
|
||||
@ -200,6 +202,7 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
|
||||
}
|
||||
}
|
||||
|
||||
bool recv_activity {false};
|
||||
for (size_t idx = 0; idx < peer.recv_transfers.size(); idx++) {
|
||||
if (!peer.recv_transfers.at(idx).has_value()) {
|
||||
continue;
|
||||
@ -209,20 +212,21 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
|
||||
|
||||
// proper switch case?
|
||||
if (transfer.state == Group::Peer::RecvTransfer::State::FINISHING) {
|
||||
transfer.finishing_timer -= time_delta;
|
||||
if (transfer.finishing_timer <= 0.f) {
|
||||
//dispatch(
|
||||
// NGCFT1_Event::recv_done,
|
||||
// Events::NGCFT1_recv_done{
|
||||
// group_number, peer_number,
|
||||
// uint8_t(idx)
|
||||
// }
|
||||
//);
|
||||
|
||||
transfer.timer -= time_delta;
|
||||
if (transfer.timer <= 0.f) {
|
||||
peer.recv_transfers.at(idx).reset();
|
||||
}
|
||||
recv_activity = true; // count as activity, not sure we need this
|
||||
} else {
|
||||
transfer.timer += time_delta;
|
||||
if (transfer.timer < 0.5f) {
|
||||
// back off when no activity
|
||||
recv_activity = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return peer.active_send_transfers > 0 || recv_activity;
|
||||
}
|
||||
|
||||
const CCAI* NGCFT1::getPeerCCA(
|
||||
@ -268,37 +272,61 @@ NGCFT1::NGCFT1(
|
||||
|
||||
float NGCFT1::iterate(float time_delta) {
|
||||
_time_since_activity += time_delta;
|
||||
bool transfer_in_progress {false};
|
||||
bool transfer_activity {false};
|
||||
for (auto& [group_number, group] : groups) {
|
||||
for (auto& [peer_number, peer] : group.peers) {
|
||||
iteratePeer(time_delta, group_number, peer_number, peer);
|
||||
transfer_activity = transfer_activity || iteratePeer(time_delta, group_number, peer_number, peer);
|
||||
|
||||
#if 0
|
||||
// find any active transfer
|
||||
if (!transfer_in_progress) {
|
||||
if (!transfer_activity) {
|
||||
for (const auto& t : peer.send_transfers) {
|
||||
if (t.has_value()) {
|
||||
transfer_in_progress = true;
|
||||
transfer_activity = true;
|
||||
#if 0
|
||||
std::cout
|
||||
<< "--- active send transfer "
|
||||
<< group_number << ":" << peer_number
|
||||
<< "(" << std::get<0>(_t.toxGroupPeerGetName(group_number, peer_number)).value_or("<unk>") << ")"
|
||||
<< " fk:" << t.value().file_kind
|
||||
<< " state:" << (int)t.value().state
|
||||
<< " tsa:" << t.value().time_since_activity
|
||||
<< "\n"
|
||||
;
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!transfer_in_progress) {
|
||||
if (!transfer_activity) {
|
||||
for (const auto& t : peer.recv_transfers) {
|
||||
if (t.has_value()) {
|
||||
transfer_in_progress = true;
|
||||
transfer_activity = true;
|
||||
#if 0
|
||||
std::cout
|
||||
<< "--- active recv transfer "
|
||||
<< group_number << ":" << peer_number
|
||||
<< "(" << std::get<0>(_t.toxGroupPeerGetName(group_number, peer_number)).value_or("<unk>") << ")"
|
||||
<< " fk:" << t.value().file_kind
|
||||
<< " state:" << (int)t.value().state
|
||||
<< " ft:" << t.value().finishing_timer
|
||||
<< "\n"
|
||||
;
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
if (transfer_in_progress) {
|
||||
if (transfer_activity) {
|
||||
_time_since_activity = 0.f;
|
||||
// ~15ms for up to 1mb/s
|
||||
// ~5ms for up to 4mb/s
|
||||
return 0.005f; // 5ms
|
||||
} else if (_time_since_activity < 0.5f) {
|
||||
} else if (_time_since_activity < 1.0f) {
|
||||
// bc of temporality
|
||||
return 0.025f;
|
||||
} else {
|
||||
@ -306,13 +334,12 @@ float NGCFT1::iterate(float time_delta) {
|
||||
}
|
||||
}
|
||||
|
||||
void NGCFT1::NGC_FT1_send_request_private(
|
||||
bool NGCFT1::NGC_FT1_send_request_private(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint32_t file_kind,
|
||||
const uint8_t* file_id, uint32_t file_id_size
|
||||
) {
|
||||
// TODO: error check
|
||||
_neep.send_ft1_request(group_number, peer_number, file_kind, file_id, file_id_size);
|
||||
return _neep.send_ft1_request(group_number, peer_number, file_kind, file_id, file_id_size);
|
||||
}
|
||||
|
||||
bool NGCFT1::NGC_FT1_send_init_private(
|
||||
@ -456,44 +483,6 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
|
||||
//#if !NDEBUG
|
||||
std::cout << "NGCFT1: got FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n";
|
||||
//#endif
|
||||
#if 0
|
||||
bool accept = false;
|
||||
dispatch(
|
||||
NGCFT1_Event::recv_init,
|
||||
Events::NGCFT1_recv_init{
|
||||
e.group_number, e.peer_number,
|
||||
static_cast<NGCFT1_file_kind>(e.file_kind),
|
||||
e.file_id.data(), static_cast<uint32_t>(e.file_id.size()),
|
||||
e.transfer_id,
|
||||
e.file_size,
|
||||
accept
|
||||
}
|
||||
);
|
||||
|
||||
if (!accept) {
|
||||
std::cout << "NGCFT1: rejected init\n";
|
||||
return true; // return true?
|
||||
}
|
||||
|
||||
_neep.send_ft1_init_ack(e.group_number, e.peer_number, e.transfer_id);
|
||||
|
||||
std::cout << "NGCFT1: accepted init\n";
|
||||
|
||||
auto& peer = groups[e.group_number].peers[e.peer_number];
|
||||
if (peer.recv_transfers[e.transfer_id].has_value()) {
|
||||
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << int(e.transfer_id) << ", other peer started new transfer on preexising\n";
|
||||
}
|
||||
|
||||
peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{
|
||||
e.file_kind,
|
||||
e.file_id,
|
||||
Group::Peer::RecvTransfer::State::INITED,
|
||||
e.file_size,
|
||||
0u,
|
||||
{} // rsb
|
||||
};
|
||||
return true;
|
||||
#else
|
||||
// HACK: simply forward to init2 hanlder
|
||||
return onEvent(Events::NGCEXT_ft1_init2{
|
||||
e.group_number,
|
||||
@ -504,12 +493,11 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
|
||||
0x00, // non set
|
||||
e.file_id, // sadly a copy, wont matter in the future
|
||||
});
|
||||
#endif
|
||||
}
|
||||
|
||||
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
|
||||
//#if !NDEBUG
|
||||
std::cout << "NGCFT1: got FT1_INIT_ACK mds:" << e.max_lossy_data_size << "\n";
|
||||
std::cout << "NGCFT1: got FT1_INIT_ACK " << e.group_number << ":" << e.peer_number << " mds:" << e.max_lossy_data_size << "\n";
|
||||
//#endif
|
||||
|
||||
// we now should start sending data
|
||||
@ -587,6 +575,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
||||
}
|
||||
|
||||
auto& transfer = peer.recv_transfers[e.transfer_id].value();
|
||||
transfer.timer = 0.f;
|
||||
|
||||
// do reassembly, ignore dups
|
||||
transfer.rsb.add(e.sequence_id, std::vector<uint8_t>(e.data)); // TODO: ugly explicit copy for what should just be a move
|
||||
@ -625,7 +614,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
||||
|
||||
// TODO: keep around for remote timeout + delay + offset, so we can be sure all acks where received
|
||||
// or implement a dedicated finished that needs to be acked
|
||||
transfer.finishing_timer = 0.75f; // TODO: we are receiving, we dont know delay
|
||||
//transfer.timer = 0.75f; // TODO: we are receiving, we dont know delay
|
||||
transfer.timer = FlowOnly::RTT_MAX;
|
||||
|
||||
dispatch(
|
||||
NGCFT1_Event::recv_done,
|
||||
@ -669,6 +659,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) {
|
||||
|
||||
{
|
||||
std::vector<CCAI::SeqIDType> seqs;
|
||||
seqs.reserve(e.sequence_ids.size());
|
||||
for (const auto it : e.sequence_ids) {
|
||||
// TODO: improve this o.o
|
||||
seqs.push_back({e.transfer_id, it});
|
||||
|
@ -156,16 +156,17 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
std::vector<uint8_t> file_id;
|
||||
|
||||
enum class State {
|
||||
INITED, //init acked, but no data received yet (might be dropped)
|
||||
RECV, // receiving data
|
||||
FINISHING, // got all the data, but we wait for 2*delay, since its likely there is data still arriving
|
||||
INITED, // init acked, but no data received yet (might be dropped)
|
||||
RECV, // receiving data
|
||||
FINISHING, // got all the data, but we wait for 2*delay, since its likely there is data still arriving
|
||||
} state;
|
||||
|
||||
uint64_t file_size {0};
|
||||
uint64_t file_size_current {0};
|
||||
|
||||
// if state INITED or RECV, time since last activity
|
||||
// if state FINISHING and it reaches 0, delete
|
||||
float finishing_timer {0.f};
|
||||
float timer {0.f};
|
||||
|
||||
// sequence id based reassembly
|
||||
RecvSequenceBuffer rsb;
|
||||
@ -209,7 +210,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
|
||||
protected:
|
||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
|
||||
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||
bool iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||
|
||||
const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const;
|
||||
|
||||
@ -223,7 +224,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
||||
float iterate(float delta);
|
||||
|
||||
public: // ft1 api
|
||||
void NGC_FT1_send_request_private(
|
||||
bool NGC_FT1_send_request_private(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint32_t file_kind,
|
||||
const uint8_t* file_id, uint32_t file_id_size
|
||||
|
@ -16,7 +16,7 @@ enum class NGCFT1_file_kind : uint32_t {
|
||||
// id: TOX_FILE_ID_LENGTH (32) bytes
|
||||
// this is basically and id and probably not a hash, like the tox friend api
|
||||
// this id can be unique between 2 peers
|
||||
ID = 8u,
|
||||
ID = 8u, // TODO: this is actually DATA and 0
|
||||
|
||||
// id: hash of the info, like a torrent infohash (using the same hash as the data)
|
||||
// TODO: determain internal format
|
||||
@ -75,20 +75,7 @@ enum class NGCFT1_file_kind : uint32_t {
|
||||
|
||||
// https://gist.github.com/Green-Sky/440cd9817a7114786850eb4c62dc57c3
|
||||
// id: ts start, ts end
|
||||
// content:
|
||||
// - ts start (do we need this? when this is part of the id?)
|
||||
// - ts end (same)
|
||||
// - list size
|
||||
// - ppk
|
||||
// - mid
|
||||
// - ts
|
||||
HS2_INFO_RANGE_TIME = 0x00000f00,
|
||||
// TODO: half open ranges
|
||||
// TODO: id based
|
||||
// TODO: ppk based?
|
||||
|
||||
// id: ppk, mid, ts
|
||||
HS2_SINGLE_MESSAGE,
|
||||
// TODO: message pack
|
||||
HS2_RANGE_TIME = 0x00000f00, // TODO: remove, did not survive
|
||||
HS2_RANGE_TIME_MSGPACK = 0x00000f02,
|
||||
};
|
||||
|
||||
|
@ -133,6 +133,7 @@ void ChunkPicker::updateParticipation(
|
||||
|
||||
entt::dense_set<Object> checked;
|
||||
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
|
||||
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
|
||||
const ObjectHandle o {objreg, ov};
|
||||
|
||||
if (participating_unfinished.contains(o)) {
|
||||
@ -150,6 +151,21 @@ void ChunkPicker::updateParticipation(
|
||||
participating_unfinished.erase(o);
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: optimize this to only change on dirty, or something
|
||||
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
|
||||
Priority prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
|
||||
|
||||
uint16_t pskips =
|
||||
prio == Priority::HIGHEST ? 0u :
|
||||
prio == Priority::HIGH ? 1u :
|
||||
prio == Priority::NORMAL ? 2u :
|
||||
prio == Priority::LOW ? 4u :
|
||||
8u // LOWEST
|
||||
;
|
||||
|
||||
participating_unfinished.at(o).should_skip = pskips;
|
||||
}
|
||||
} else {
|
||||
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
|
||||
continue;
|
||||
@ -160,8 +176,6 @@ void ChunkPicker::updateParticipation(
|
||||
}
|
||||
|
||||
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
|
||||
//using Priority = Components::DownloadPriority::Priority;
|
||||
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
|
||||
Priority prio = Priority::NORMAL;
|
||||
|
||||
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
|
||||
@ -239,13 +253,12 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||
// round robin content (remember last obj)
|
||||
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
|
||||
participating_in_last = participating_unfinished.begin()->first;
|
||||
//participating_in_last = *participating_unfinished.begin();
|
||||
}
|
||||
assert(objreg.valid(participating_in_last));
|
||||
|
||||
auto it = participating_unfinished.find(participating_in_last);
|
||||
// hard limit robin rounds to array size time 20
|
||||
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++) {
|
||||
// hard limit robin rounds to array size times 20
|
||||
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) {
|
||||
if (it == participating_unfinished.end()) {
|
||||
it = participating_unfinished.begin();
|
||||
}
|
||||
@ -254,11 +267,12 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||
it->second.skips++;
|
||||
continue;
|
||||
}
|
||||
it->second.skips = 0;
|
||||
|
||||
ObjectHandle o {objreg, it->first};
|
||||
|
||||
// intersect self have with other have
|
||||
if (!o.all_of<Components::RemoteHaveBitset, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
|
||||
if (!o.all_of<ObjComp::F::RemoteHaveBitset, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
|
||||
// rare case where no one else has anything
|
||||
continue;
|
||||
}
|
||||
@ -270,7 +284,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||
|
||||
//const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
|
||||
|
||||
const auto& others_have = o.get<Components::RemoteHaveBitset>().others;
|
||||
const auto& others_have = o.get<ObjComp::F::RemoteHaveBitset>().others;
|
||||
auto other_it = others_have.find(c);
|
||||
if (other_it == others_have.end()) {
|
||||
// rare case where the other is participating but has nothing
|
||||
@ -361,7 +375,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||
}
|
||||
}
|
||||
|
||||
if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
|
||||
if (it == participating_unfinished.end()) {
|
||||
participating_in_last = entt::null;
|
||||
} else {
|
||||
participating_in_last = it->first;
|
||||
|
@ -49,7 +49,8 @@ void ReAnnounceTimer::reset(void) {
|
||||
|
||||
void ReAnnounceTimer::lower(void) {
|
||||
timer *= 0.1f;
|
||||
last_max *= 0.1f;
|
||||
//last_max *= 0.1f; // is this a good idea?
|
||||
last_max *= 0.9f; // is this a good idea?
|
||||
}
|
||||
|
||||
void TransferStatsTally::Peer::trimSent(const float time_now) {
|
||||
|
@ -70,14 +70,6 @@ namespace Components {
|
||||
entt::dense_set<Contact3> participants;
|
||||
};
|
||||
|
||||
struct RemoteHaveBitset {
|
||||
struct Entry {
|
||||
bool have_all {false};
|
||||
BitSet have;
|
||||
};
|
||||
entt::dense_map<Contact3, Entry> others;
|
||||
};
|
||||
|
||||
struct ReRequestInfoTimer {
|
||||
float timer {0.f};
|
||||
};
|
||||
|
@ -63,6 +63,7 @@ size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const {
|
||||
|
||||
std::vector<uint8_t> FT1InfoSHA1::toBuffer(void) const {
|
||||
std::vector<uint8_t> buffer;
|
||||
buffer.reserve(256+8+4+20*chunks.size());
|
||||
|
||||
assert(!file_name.empty());
|
||||
// TODO: optimize
|
||||
|
@ -18,6 +18,12 @@ void re_announce(
|
||||
std::vector<Object> to_remove;
|
||||
os_reg.view<Components::ReAnnounceTimer>().each([&os_reg, &cr, &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) {
|
||||
ObjectHandle o{os_reg, ov};
|
||||
// if no known targets, or no hash, remove
|
||||
if (!o.all_of<Components::AnnounceTargets, Components::FT1InfoSHA1Hash>()) {
|
||||
to_remove.push_back(ov);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: pause
|
||||
//// if paused -> remove
|
||||
//if (o.all_of<Message::Components::Transfer::TagPaused>()) {
|
||||
@ -25,10 +31,11 @@ void re_announce(
|
||||
// return;
|
||||
//}
|
||||
|
||||
// if not downloading or info incomplete -> remove
|
||||
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
|
||||
// // if not downloading or info incomplete -> remove
|
||||
//if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
|
||||
// if not downloading AND info complete -> remove
|
||||
if (!o.all_of<Components::FT1ChunkSHA1Cache>() && o.all_of<Components::FT1InfoSHA1Data>()) {
|
||||
to_remove.push_back(ov);
|
||||
assert(false && "transfer in broken state");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -7,8 +7,8 @@ void ReceivingTransfers::tick(float delta) {
|
||||
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
|
||||
it->second.time_since_activity += delta;
|
||||
|
||||
// if we have not heard for 20sec, timeout
|
||||
if (it->second.time_since_activity >= 20.f) {
|
||||
// if we have not heard for 60sec, timeout
|
||||
if (it->second.time_since_activity >= 60.f) {
|
||||
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
|
||||
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
|
||||
//it->second.v
|
||||
|
@ -69,6 +69,12 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
|
||||
assert(o.all_of<Components::Messages>());
|
||||
|
||||
for (auto msg : o.get<Components::Messages>().messages) {
|
||||
// FIXME: hs might create and destory messages for objects without events
|
||||
// we should really do garbage collection
|
||||
if (!msg) {
|
||||
continue;
|
||||
}
|
||||
|
||||
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
|
||||
|
||||
// messages no long hold this info
|
||||
@ -82,33 +88,20 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
|
||||
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) {
|
||||
// get a list of peers we can request this file from
|
||||
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
|
||||
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
|
||||
// TODO: sort by con state?
|
||||
// prio to direct?
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
|
||||
// 1 in 20 chance to ask random peer instead
|
||||
// also works well for empty SuspectedParticipants
|
||||
if ((_rng()%20) == 0) {
|
||||
tox_peers.clear();
|
||||
// or messages? should be the same
|
||||
if (!ce.all_of<Components::AnnounceTargets>()) {
|
||||
// rip
|
||||
std::cerr << "SHA1_NGCFT1 warning: tried random, but no AnnounceTargets\n";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
|
||||
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
|
||||
}
|
||||
}
|
||||
|
||||
// 1 in 40 chance to ask random peer instead
|
||||
// TODO: config + tweak
|
||||
// TODO: save group in content to avoid the tox_peers list build
|
||||
// TODO: remove once pc1_announce is shipped
|
||||
if (tox_peers.empty() || (_rng()%40) == 0) {
|
||||
// meh
|
||||
// HACK: determain group based on last tox_peers
|
||||
if (!tox_peers.empty()) {
|
||||
const uint32_t group_number = tox_peers.back().first;
|
||||
auto gch = _tcm.getContactGroup(group_number);
|
||||
assert(static_cast<bool>(gch));
|
||||
|
||||
std::vector<uint32_t> un_tox_peers;
|
||||
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
|
||||
for (const auto& target : ce.get<Components::AnnounceTargets>().targets) {
|
||||
for (const auto child : _cr.get<Contact::Components::ParentOf>(target).subs) {
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
}
|
||||
@ -119,26 +112,39 @@ std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(O
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
|
||||
un_tox_peers.push_back(tgpe.peer_number);
|
||||
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
|
||||
}
|
||||
}
|
||||
if (un_tox_peers.empty()) {
|
||||
// no one online, we are out of luck
|
||||
} else {
|
||||
const size_t sample_i = _rng()%un_tox_peers.size();
|
||||
const auto peer_number = un_tox_peers.at(sample_i);
|
||||
}
|
||||
std::cout << "SHA1_NGCFT1: doing random peer select over " << tox_peers.size() << " peers\n";
|
||||
} else if (ce.all_of<Components::SuspectedParticipants>()) {
|
||||
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
|
||||
// TODO: sort by con state?
|
||||
// prio to direct?
|
||||
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
return std::make_pair(group_number, peer_number);
|
||||
if (_cr.all_of<Contact::Components::TagSelfStrong>(c)) {
|
||||
// FIXME: how did we select ourselfs to be a suspected participant
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
|
||||
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
|
||||
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const size_t sample_i = _rng()%tox_peers.size();
|
||||
const auto [group_number, peer_number] = tox_peers.at(sample_i);
|
||||
|
||||
return std::make_pair(group_number, peer_number);
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
if (tox_peers.empty()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
const size_t sample_i = _rng()%tox_peers.size();
|
||||
const auto [group_number, peer_number] = tox_peers.at(sample_i);
|
||||
|
||||
return std::make_pair(group_number, peer_number);
|
||||
}
|
||||
|
||||
void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) {
|
||||
@ -247,10 +253,9 @@ SHA1_NGCFT1::SHA1_NGCFT1(
|
||||
}
|
||||
|
||||
float SHA1_NGCFT1::iterate(float delta) {
|
||||
//std::cerr << "---------- new tick ----------\n";
|
||||
_mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
|
||||
|
||||
entt::dense_map<Contact3, size_t> peer_open_requests;
|
||||
_peer_open_requests.clear();
|
||||
|
||||
{ // timers
|
||||
// sending transfers
|
||||
@ -293,13 +298,12 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
assert(!o.any_of<ObjComp::F::TagLocalHaveAll>());
|
||||
|
||||
_queue_content_want_info.push_back(o);
|
||||
//_os.registry().remove<Components::ReRequestInfoTimer>(e);
|
||||
o.remove<Components::ReRequestInfoTimer>();
|
||||
// TODO: throw update?
|
||||
}
|
||||
}
|
||||
{ // requested chunk timers
|
||||
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([delta, &peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||
_os.registry().view<Components::FT1ChunkSHA1Requested>().each([this, delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
|
||||
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
||||
it->second.timer += delta;
|
||||
|
||||
@ -307,7 +311,7 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
if (it->second.timer >= 60.f) {
|
||||
it = ftchunk_requested.chunks.erase(it);
|
||||
} else {
|
||||
peer_open_requests[it->second.c] += 1;
|
||||
_peer_open_requests[it->second.c] += 1;
|
||||
it++;
|
||||
}
|
||||
}
|
||||
@ -420,6 +424,8 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
|
||||
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
|
||||
|
||||
//std::cout << "SHA1_NGCFT1: trying to request info\n";
|
||||
|
||||
auto selected_peer_opt = selectPeerForRequest(ce);
|
||||
if (selected_peer_opt.has_value()) {
|
||||
const auto [group_number, peer_number] = selected_peer_opt.value();
|
||||
@ -434,10 +440,12 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
);
|
||||
ce.emplace<Components::ReRequestInfoTimer>(0.f);
|
||||
|
||||
_queue_content_want_info.pop_front();
|
||||
|
||||
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
|
||||
} else {
|
||||
_queue_content_want_info.push_back(ce);
|
||||
}
|
||||
|
||||
_queue_content_want_info.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
@ -447,7 +455,7 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
Systems::chunk_picker_updates(
|
||||
_cr,
|
||||
_os.registry(),
|
||||
peer_open_requests,
|
||||
_peer_open_requests,
|
||||
_receiving_transfers,
|
||||
_nft,
|
||||
delta
|
||||
@ -456,12 +464,12 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
// transfer statistics systems
|
||||
Systems::transfer_tally_update(_os.registry(), getTimeNow());
|
||||
|
||||
if (peer_open_requests.empty()) {
|
||||
if (_peer_open_requests.empty()) {
|
||||
return 2.f;
|
||||
} else {
|
||||
// pretty conservative and should be ajusted on a per peer, per delay basis
|
||||
// seems to do the trick
|
||||
return 0.05f;
|
||||
// ft1 will go lower for us, if we have unresolved info,
|
||||
// we dont want to be stuck in a high tickrate
|
||||
return 0.5f;
|
||||
}
|
||||
}
|
||||
|
||||
@ -552,7 +560,79 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
|
||||
updateMessages(o); // nop // TODO: remove
|
||||
}
|
||||
|
||||
ObjectHandle SHA1_NGCFT1::constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_kind file_kind, ByteSpan file_id) {
|
||||
if (file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// check if content exists
|
||||
const std::vector<uint8_t> sha1_info_hash{file_id.cbegin(), file_id.cend()};
|
||||
ObjectHandle o;
|
||||
if (_info_to_content.count(sha1_info_hash)) {
|
||||
o = _info_to_content.at(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has existing content\n";
|
||||
} else {
|
||||
// TODO: backend
|
||||
o = _mfb.newObject(ByteSpan{sha1_info_hash});
|
||||
_info_to_content[sha1_info_hash] = o;
|
||||
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has new content\n";
|
||||
}
|
||||
o.get_or_emplace<Components::Messages>().messages.push_back(msg);
|
||||
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
|
||||
|
||||
// TODO: use to_c instead?
|
||||
if (const auto* from_c_comp = msg.try_get<Message::Components::ContactFrom>(); from_c_comp != nullptr && _cr.valid(from_c_comp->c)) {
|
||||
Contact3Handle c{_cr, from_c_comp->c};
|
||||
|
||||
// TODO: check if public
|
||||
// since public
|
||||
if (c.all_of<Contact::Components::Parent>()) {
|
||||
// TODO: if this is a dummy contact, should it have parent?
|
||||
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
|
||||
} else {
|
||||
std::cerr << "SHA1_NGCFT1 warning: from contact has no parent, cant add to AnnounceTargets\n";
|
||||
}
|
||||
// TODO: if private, add c directly
|
||||
}
|
||||
|
||||
// queue announce that we are participating
|
||||
o.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
|
||||
|
||||
// TODO: queue info dl
|
||||
// TODO: queue info/check if we already have info
|
||||
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
|
||||
bool in_info_want {false};
|
||||
for (const auto it : _queue_content_want_info) {
|
||||
if (it == o) {
|
||||
in_info_want = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!in_info_want) {
|
||||
// TODO: check if already receiving
|
||||
_queue_content_want_info.push_back(o);
|
||||
}
|
||||
} else if (o.all_of<Components::FT1InfoSHA1>()){
|
||||
// remove from info want
|
||||
o.remove<Components::ReRequestInfoTimer>();
|
||||
|
||||
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
|
||||
if (it != _queue_content_want_info.cend()) {
|
||||
_queue_content_want_info.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
_os.throwEventUpdate(o);
|
||||
|
||||
return o;
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
|
||||
if (_object_update_lock) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!e.e.all_of<ObjComp::Ephemeral::File::ActionTransferAccept>()) {
|
||||
return false;
|
||||
}
|
||||
@ -561,10 +641,17 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
|
||||
// not ready to load yet, skip
|
||||
return false;
|
||||
}
|
||||
|
||||
if (e.e.all_of<Components::FT1ChunkSHA1Cache>()) {
|
||||
// now we update on recv_done, which included info
|
||||
std::cerr << "SHA1_NGCFT1 warning: accepted but already has FT1ChunkSHA1Cache\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
_object_update_lock = true;
|
||||
|
||||
assert(!e.e.all_of<ObjComp::F::TagLocalHaveAll>());
|
||||
assert(!e.e.all_of<Components::FT1ChunkSHA1Cache>());
|
||||
assert(!e.e.all_of<Components::FT1File2>());
|
||||
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
|
||||
|
||||
// first, open file for write(+readback)
|
||||
std::string full_file_path{e.e.get<ObjComp::Ephemeral::File::ActionTransferAccept>().save_to_path};
|
||||
@ -589,6 +676,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
|
||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
|
||||
// we failed opening that filepath, so we should offer the user the oportunity to save it differently
|
||||
e.e.remove<ObjComp::Ephemeral::File::ActionTransferAccept>(); // stop
|
||||
_object_update_lock = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -667,6 +755,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
|
||||
|
||||
updateMessages(e.e);
|
||||
|
||||
_object_update_lock = false;
|
||||
return false; // ?
|
||||
}
|
||||
|
||||
@ -717,7 +806,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
}
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
|
||||
if (e.file_id_size != 20) {
|
||||
// error
|
||||
@ -735,7 +824,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
|
||||
{ // they advertise interest in the content
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
if (addParticipation(c, o)) {
|
||||
// something happend, update chunk picker
|
||||
assert(static_cast<bool>(c));
|
||||
@ -796,7 +885,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
e.accept = true;
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
|
||||
SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size};
|
||||
|
||||
@ -809,7 +898,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
|
||||
{ // they have the content (probably, might be fake, should move this to done)
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
if (addParticipation(c, o)) {
|
||||
// something happend, update chunk picker
|
||||
assert(static_cast<bool>(c));
|
||||
@ -868,7 +957,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
|
||||
std::cerr << "SHA1_NGCFT1 waring: unknown transfer " << e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
|
||||
// not ours
|
||||
//std::cerr << "SHA1_NGCFT1 warning: unknown transfer " << (int)e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -920,7 +1010,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
|
||||
std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
|
||||
// not ours
|
||||
//std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -955,7 +1046,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
// TODO: add event to propergate to messages
|
||||
//_rmm.throwEventUpdate(transfer); // should we?
|
||||
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
Contact3Handle c;
|
||||
const auto tpcc_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
|
||||
if (tpcc_it != _tox_peer_to_contact.cend()) {
|
||||
c = tpcc_it->second;
|
||||
} else {
|
||||
c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (static_cast<bool>(c)) {
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c;
|
||||
}
|
||||
}
|
||||
if (static_cast<bool>(c)) {
|
||||
chunk_transfer.content.get_or_emplace<Components::TransferStatsTally>()
|
||||
.tally[c]
|
||||
@ -1024,11 +1124,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
|
||||
o.emplace_or_replace<ObjComp::Ephemeral::File::TagTransferPaused>();
|
||||
|
||||
_os.throwEventUpdate(o);
|
||||
|
||||
updateMessages(o);
|
||||
} else if (transfer.isChunk()) {
|
||||
auto o = transfer.getChunk().content;
|
||||
const auto& info = o.get<Components::FT1InfoSHA1>();
|
||||
auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
|
||||
auto& cc = o.get<Components::FT1ChunkSHA1Cache>(); // is this assumption save?
|
||||
|
||||
// HACK: only check first chunk (they *should* all be the same)
|
||||
const auto chunk_index = transfer.getChunk().chunk_indices.front();
|
||||
@ -1136,6 +1238,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
|
||||
}
|
||||
|
||||
_os.throwEventUpdate(o);
|
||||
|
||||
updateMessages(o); // mostly for received bytes
|
||||
}
|
||||
|
||||
@ -1152,11 +1256,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
|
||||
auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id);
|
||||
|
||||
if (transfer.isChunk()) {
|
||||
// we could cheat here and assume remote has chunk now
|
||||
_os.throwEventUpdate(transfer.getChunk().content);
|
||||
|
||||
updateMessages(transfer.getChunk().content); // mostly for sent bytes
|
||||
}
|
||||
} // ignore info transfer for now
|
||||
|
||||
_sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id);
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1165,6 +1273,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: make perms go though contacts
|
||||
// TODO: consider temporal component? not here, here is now
|
||||
if (!_tcm.groupPeerCanSpeak(e.group_number, e.peer_number)) {
|
||||
// peer has not the permission to speak, discard
|
||||
return false; // return true?
|
||||
}
|
||||
|
||||
uint64_t ts = Message::getTimeMS();
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
@ -1212,60 +1327,12 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
rb.try_emplace(self_c, ts);
|
||||
}
|
||||
|
||||
// check if content exists
|
||||
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
|
||||
ObjectHandle o;
|
||||
if (_info_to_content.count(sha1_info_hash)) {
|
||||
o = _info_to_content.at(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has existing content\n";
|
||||
} else {
|
||||
// TODO: backend
|
||||
o = _mfb.newObject(ByteSpan{sha1_info_hash});
|
||||
_info_to_content[sha1_info_hash] = o;
|
||||
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has new content\n";
|
||||
auto o = constructFileMessageInPlace({reg, new_msg_e}, e.file_kind, {e.file_id, e.file_id_size});
|
||||
|
||||
if (o) {
|
||||
// the other peer just sent the file message, so it is likely they have the file
|
||||
addParticipation(c, o);
|
||||
}
|
||||
o.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
|
||||
reg_ptr->emplace<Message::Components::MessageFileObject>(new_msg_e, o);
|
||||
|
||||
// HACK: assume the message sender is participating. usually a safe bet.
|
||||
if (addParticipation(c, o)) {
|
||||
// something happend, update chunk picker
|
||||
assert(static_cast<bool>(c));
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
// HACK: assume the message sender has all
|
||||
o.get_or_emplace<Components::RemoteHaveBitset>().others[c] = {true, {}};
|
||||
|
||||
// TODO: queue info dl
|
||||
// TODO: queue info/check if we already have info
|
||||
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
|
||||
bool in_info_want {false};
|
||||
for (const auto it : _queue_content_want_info) {
|
||||
if (it == o) {
|
||||
in_info_want = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!in_info_want) {
|
||||
// TODO: check if already receiving
|
||||
_queue_content_want_info.push_back(o);
|
||||
}
|
||||
} else if (o.all_of<Components::FT1InfoSHA1>()){
|
||||
// remove from info want
|
||||
o.remove<Components::ReRequestInfoTimer>();
|
||||
|
||||
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
|
||||
if (it != _queue_content_want_info.cend()) {
|
||||
_queue_content_want_info.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
// since public
|
||||
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
|
||||
|
||||
_os.throwEventUpdate(o);
|
||||
|
||||
_rmm.throwEventConstruct(reg, new_msg_e);
|
||||
|
||||
@ -1340,8 +1407,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
||||
for (const auto& [_, o] : _info_to_content) {
|
||||
removeParticipation(c, o);
|
||||
|
||||
if (o.all_of<Components::RemoteHaveBitset>()) {
|
||||
o.get<Components::RemoteHaveBitset>().others.erase(c);
|
||||
if (o.all_of<ObjComp::F::RemoteHaveBitset>()) {
|
||||
o.get<ObjComp::F::RemoteHaveBitset>().others.erase(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1383,11 +1450,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
assert(static_cast<bool>(c));
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
|
||||
// we might not know yet
|
||||
if (addParticipation(c, o)) {
|
||||
@ -1395,10 +1460,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
|
||||
//c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
|
||||
if (!o.all_of<Components::FT1InfoSHA1>()) {
|
||||
// we dont have the info yet
|
||||
return true;
|
||||
}
|
||||
|
||||
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
|
||||
|
||||
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
|
||||
if (!remote_have.contains(c)) {
|
||||
// init
|
||||
remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
|
||||
remote_have.emplace(c, ObjComp::F::RemoteHaveBitset::Entry{false, num_total_chunks});
|
||||
|
||||
// new have? nice
|
||||
//c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
@ -1424,11 +1496,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
|
||||
a_valid_change = true;
|
||||
}
|
||||
|
||||
if (a_valid_change) {
|
||||
// new have? nice
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
// check for completion?
|
||||
// TODO: optimize
|
||||
bool test_all {true};
|
||||
@ -1445,6 +1512,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
|
||||
remote_have_peer.have = BitSet{};
|
||||
}
|
||||
|
||||
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
|
||||
return true; // we dont care beyond this point
|
||||
}
|
||||
|
||||
if (a_valid_change) {
|
||||
// new have? nice
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1474,6 +1550,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
|
||||
std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n";
|
||||
return false;
|
||||
}
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
assert(static_cast<bool>(c));
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
|
||||
// we might not know yet
|
||||
addParticipation(c, o);
|
||||
|
||||
if (!o.all_of<Components::FT1InfoSHA1>()) {
|
||||
// we dont have the info yet
|
||||
return true;
|
||||
}
|
||||
|
||||
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
|
||||
// +7 for byte rounding
|
||||
@ -1483,22 +1570,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
assert(static_cast<bool>(c));
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
|
||||
|
||||
// we might not know yet
|
||||
addParticipation(c, o);
|
||||
|
||||
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
|
||||
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
|
||||
if (!remote_have.contains(c)) {
|
||||
// init
|
||||
remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
|
||||
remote_have.emplace(c, ObjComp::F::RemoteHaveBitset::Entry{false, num_total_chunks});
|
||||
}
|
||||
|
||||
auto& remote_have_peer = remote_have.at(c);
|
||||
if (!remote_have_peer.have_all) { // TODO: maybe unset with bitset?
|
||||
BitSet event_bitset{e.chunk_bitset};
|
||||
// TODO: range replace instead
|
||||
remote_have_peer.have.merge(event_bitset, e.start_chunk);
|
||||
|
||||
// check for completion?
|
||||
@ -1548,13 +1629,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) {
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
assert(static_cast<bool>(c));
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
|
||||
// we might not know yet
|
||||
addParticipation(c, o);
|
||||
|
||||
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
|
||||
remote_have[c] = Components::RemoteHaveBitset::Entry{true, {}};
|
||||
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
|
||||
remote_have[c] = ObjComp::F::RemoteHaveBitset::Entry{true, {}};
|
||||
|
||||
// new have? nice
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
@ -1595,21 +1676,27 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) {
|
||||
|
||||
// add to participants
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
auto o = itc_it->second;
|
||||
if (addParticipation(c, o)) {
|
||||
// something happend, update chunk picker
|
||||
// !!! this is probably too much
|
||||
assert(static_cast<bool>(c));
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
|
||||
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
std::cout << "SHA1_NGCFT1: and we where interested!\n";
|
||||
// we should probably send the bitset back here / add to queue (can be multiple packets)
|
||||
if (o.all_of<Components::FT1ChunkSHA1Cache>() && o.get<Components::FT1ChunkSHA1Cache>().have_count > 0) {
|
||||
queueBitsetSendFull(c, o);
|
||||
} else if (o.all_of<Components::ReAnnounceTimer>()) {
|
||||
o.get<Components::ReAnnounceTimer>().lower();
|
||||
}
|
||||
}
|
||||
|
||||
// return true instead?
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -37,6 +37,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
|
||||
Backends::SHA1MappedFilesystem _mfb;
|
||||
|
||||
bool _object_update_lock {false};
|
||||
|
||||
std::minstd_rand _rng {1337*11};
|
||||
|
||||
using clock = std::chrono::steady_clock;
|
||||
@ -71,9 +73,12 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
std::deque<QBitsetEntry> _queue_send_bitset;
|
||||
|
||||
// FIXME: workaround missing contact events
|
||||
// only used to remove participation on peer exit
|
||||
// only used on peer exit (no, also used to quicken lookups)
|
||||
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
||||
|
||||
// reset every iterate; kept here as an allocation optimization
|
||||
entt::dense_map<Contact3, size_t> _peer_open_requests;
|
||||
|
||||
void updateMessages(ObjectHandle ce);
|
||||
|
||||
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
|
||||
@ -104,6 +109,9 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
|
||||
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts);
|
||||
|
||||
// construct the file part in a partially constructed message
|
||||
ObjectHandle constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_kind file_kind, ByteSpan file_id);
|
||||
|
||||
protected: // rmm events (actions)
|
||||
bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;
|
||||
|
||||
|
@ -1,91 +0,0 @@
|
||||
#include "./ngc_hs2.hpp"
|
||||
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
|
||||
NGCHS2::NGCHS2(
|
||||
ToxContactModel2& tcm,
|
||||
ToxEventProviderI& tep,
|
||||
NGCFT1& nft
|
||||
) :
|
||||
_tcm(tcm),
|
||||
_tep_sr(tep.newSubRef(this)),
|
||||
_nft(nft),
|
||||
_nftep_sr(_nft.newSubRef(this))
|
||||
{
|
||||
_tep_sr
|
||||
.subscribe(TOX_EVENT_GROUP_PEER_JOIN)
|
||||
.subscribe(TOX_EVENT_GROUP_PEER_EXIT)
|
||||
;
|
||||
|
||||
_nftep_sr
|
||||
.subscribe(NGCFT1_Event::recv_init)
|
||||
.subscribe(NGCFT1_Event::recv_request)
|
||||
.subscribe(NGCFT1_Event::recv_init)
|
||||
.subscribe(NGCFT1_Event::recv_data)
|
||||
.subscribe(NGCFT1_Event::send_data)
|
||||
.subscribe(NGCFT1_Event::recv_done)
|
||||
.subscribe(NGCFT1_Event::send_done)
|
||||
;
|
||||
}
|
||||
|
||||
NGCHS2::~NGCHS2(void) {
|
||||
}
|
||||
|
||||
float NGCHS2::iterate(float delta) {
|
||||
return 1000.f;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
if (
|
||||
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
|
||||
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
|
||||
) {
|
||||
return false; // not for us
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
if (
|
||||
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
|
||||
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
|
||||
) {
|
||||
return false; // not for us
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_recv_data&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_send_data&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_recv_done&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_send_done&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
|
||||
const auto group_number = tox_event_group_peer_join_get_group_number(e);
|
||||
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
|
||||
assert(c);
|
||||
|
||||
// add to check list with inital cooldown
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1,44 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
//#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
|
||||
//#include <solanaceae/message3/registry_message_model.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
|
||||
// fwd
|
||||
class ToxContactModel2;
|
||||
|
||||
class NGCHS2 : public ToxEventI, public NGCFT1EventI {
|
||||
ToxContactModel2& _tcm;
|
||||
//Contact3Registry& _cr;
|
||||
//RegistryMessageModelI& _rmm;
|
||||
ToxEventProviderI::SubscriptionReference _tep_sr;
|
||||
NGCFT1& _nft;
|
||||
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
||||
|
||||
public:
|
||||
NGCHS2(
|
||||
ToxContactModel2& tcm,
|
||||
ToxEventProviderI& tep,
|
||||
NGCFT1& nf
|
||||
);
|
||||
|
||||
~NGCHS2(void);
|
||||
|
||||
float iterate(float delta);
|
||||
|
||||
protected:
|
||||
bool onEvent(const Events::NGCFT1_recv_request&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_init&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_done&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_done&) override;
|
||||
|
||||
protected:
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
|
||||
};
|
||||
|
532
solanaceae/ngc_hs2/ngc_hs2_rizzler.cpp
Normal file
532
solanaceae/ngc_hs2/ngc_hs2_rizzler.cpp
Normal file
@ -0,0 +1,532 @@
|
||||
#include "./ngc_hs2_rizzler.hpp"
|
||||
|
||||
#include <solanaceae/contact/components.hpp>
|
||||
#include <solanaceae/tox_contacts/components.hpp>
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
#include <solanaceae/message3/contact_components.hpp>
|
||||
#include <solanaceae/message3/registry_message_model.hpp>
|
||||
#include <solanaceae/message3/components.hpp>
|
||||
#include <solanaceae/tox_messages/msg_components.hpp>
|
||||
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
|
||||
|
||||
// TODO: move somewhere else?
|
||||
#include <solanaceae/ngc_ft1_sha1/util.hpp>
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <entt/entity/entity.hpp>
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#include "./serl.hpp"
|
||||
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <cstring>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
// TODO: move to own file
|
||||
namespace Components {
|
||||
struct RequestedChatLogs {
|
||||
struct Entry {
|
||||
uint64_t ts_start;
|
||||
uint64_t ts_end;
|
||||
//std::vector<uint8_t> fid; // ?
|
||||
};
|
||||
std::deque<Entry> list;
|
||||
bool contains(uint64_t ts_start, uint64_t ts_end);
|
||||
void addRequest(uint64_t ts_start, uint64_t ts_end);
|
||||
};
|
||||
|
||||
struct RunningChatLogs {
|
||||
struct Entry {
|
||||
uint64_t ts_start;
|
||||
uint64_t ts_end;
|
||||
std::vector<uint8_t> data;
|
||||
float last_activity {0.f};
|
||||
};
|
||||
// list of transfers
|
||||
entt::dense_map<uint8_t, Entry> list;
|
||||
};
|
||||
|
||||
bool RequestedChatLogs::contains(uint64_t ts_start, uint64_t ts_end) {
|
||||
auto it = std::find_if(list.cbegin(), list.cend(), [ts_start, ts_end](const auto& value) {
|
||||
return value.ts_start == ts_start && value.ts_end == ts_end;
|
||||
});
|
||||
return it != list.cend();
|
||||
}
|
||||
|
||||
void RequestedChatLogs::addRequest(uint64_t ts_start, uint64_t ts_end) {
|
||||
if (contains(ts_start, ts_end)) {
|
||||
return; // pre existing
|
||||
}
|
||||
list.push_back(Entry{ts_start, ts_end});
|
||||
}
|
||||
|
||||
} // Components
|
||||
|
||||
// TODO: move to contact reg?
|
||||
static Contact3 findContactByID(Contact3Registry& cr, const std::vector<uint8_t>& id) {
|
||||
// TODO: id lookup table, this is very inefficent
|
||||
for (const auto& [c_it, id_it] : cr.view<Contact::Components::ID>().each()) {
|
||||
if (id == id_it.data) {
|
||||
return c_it;
|
||||
}
|
||||
}
|
||||
|
||||
return entt::null;
|
||||
}
|
||||
|
||||
NGCHS2Rizzler::NGCHS2Rizzler(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
NGCFT1& nft,
|
||||
ToxEventProviderI& tep,
|
||||
SHA1_NGCFT1& sha1_nft
|
||||
) :
|
||||
_cr(cr),
|
||||
_rmm(rmm),
|
||||
_tcm(tcm),
|
||||
_nft(nft),
|
||||
_nftep_sr(_nft.newSubRef(this)),
|
||||
_tep_sr(tep.newSubRef(this)),
|
||||
_sha1_nft(sha1_nft)
|
||||
{
|
||||
_nftep_sr
|
||||
.subscribe(NGCFT1_Event::recv_init)
|
||||
.subscribe(NGCFT1_Event::recv_data)
|
||||
.subscribe(NGCFT1_Event::recv_done)
|
||||
;
|
||||
_tep_sr
|
||||
.subscribe(Tox_Event_Type::TOX_EVENT_GROUP_PEER_JOIN)
|
||||
;
|
||||
}
|
||||
|
||||
NGCHS2Rizzler::~NGCHS2Rizzler(void) {
|
||||
}
|
||||
|
||||
float NGCHS2Rizzler::iterate(float delta) {
|
||||
for (auto it = _request_queue.begin(); it != _request_queue.end();) {
|
||||
it->second.timer += delta;
|
||||
|
||||
if (it->second.timer < it->second.delay) {
|
||||
it++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const Contact3Handle c {_cr, it->first};
|
||||
|
||||
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
|
||||
// peer nolonger online
|
||||
it = _request_queue.erase(it);
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
|
||||
|
||||
// now in sec
|
||||
const uint64_t ts_now = Message::getTimeMS()/1000;
|
||||
|
||||
const uint64_t ts_start = ts_now;
|
||||
const uint64_t ts_end = ts_now-(60*60*48);
|
||||
|
||||
if (sendRequest(group_number, peer_number, ts_start, ts_end)) {
|
||||
// TODO: requeue
|
||||
// TODO: segment
|
||||
// TODO: dont request already received ranges
|
||||
|
||||
//// on success, requeue with longer delay (minutes)
|
||||
|
||||
//it->second.timer = 0.f;
|
||||
//it->second.delay = _delay_next_request_min + _rng_dist(_rng)*_delay_next_request_add;
|
||||
|
||||
//// double the delay for overlap (9m-15m)
|
||||
//// TODO: finetune
|
||||
//it->second.sync_delta = uint8_t((it->second.delay/60.f)*2.f) + 1;
|
||||
|
||||
//std::cout << "ZOX #### requeued request in " << it->second.delay << "s\n";
|
||||
|
||||
auto& rcl = c.get_or_emplace<Components::RequestedChatLogs>();
|
||||
rcl.addRequest(ts_start, ts_end);
|
||||
} else {
|
||||
// on failure, assume disconnected
|
||||
}
|
||||
|
||||
// remove from request queue
|
||||
it = _request_queue.erase(it);
|
||||
}
|
||||
|
||||
return 1000.f;
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::sendRequest(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint64_t ts_start, uint64_t ts_end
|
||||
) {
|
||||
std::cout << "NGCHS2Rizzler: sending request to " << group_number << ":" << peer_number << " (" << ts_start << "," << ts_end << ")\n";
|
||||
|
||||
// build fid
|
||||
std::vector<uint8_t> fid;
|
||||
fid.reserve(sizeof(uint64_t)+sizeof(uint64_t));
|
||||
|
||||
serlSimpleType(fid, ts_start);
|
||||
serlSimpleType(fid, ts_end);
|
||||
|
||||
assert(fid.size() == sizeof(uint64_t)+sizeof(uint64_t));
|
||||
|
||||
return _nft.NGC_FT1_send_request_private(
|
||||
group_number, peer_number,
|
||||
(uint32_t)NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK,
|
||||
fid.data(), fid.size() // fid
|
||||
);
|
||||
}
|
||||
|
||||
void NGCHS2Rizzler::handleMsgPack(Contact3Handle sync_by_c, const std::vector<uint8_t>& data) {
|
||||
assert(sync_by_c);
|
||||
|
||||
auto* reg_ptr = _rmm.get(sync_by_c);
|
||||
if (reg_ptr == nullptr) {
|
||||
std::cerr << "NGCHS2Rizzler error: group without msg reg\n";
|
||||
return;
|
||||
}
|
||||
|
||||
Message3Registry& reg = *reg_ptr;
|
||||
|
||||
uint64_t now_ts = Message::getTimeMS();
|
||||
|
||||
std::cout << "NGCHS2Rizzler: start parsing msgpack chatlog from " << entt::to_integral(sync_by_c.entity()) << "\n";
|
||||
try {
|
||||
const auto j = nlohmann::json::from_msgpack(data);
|
||||
if (!j.is_array()) {
|
||||
std::cerr << "NGCHS2Rizzler error: chatlog not array\n";
|
||||
return;
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Rizzler: chatlog has " << j.size() << " entries\n";
|
||||
|
||||
for (const auto j_entry : j) {
|
||||
try {
|
||||
// deci seconds
|
||||
uint64_t ts = j_entry.at("ts");
|
||||
// TODO: check against ts range
|
||||
|
||||
ts *= 100; // convert to ms
|
||||
|
||||
const auto& j_ppk = j_entry.at("ppk");
|
||||
|
||||
uint32_t mid = j_entry.at("mid");
|
||||
|
||||
if (
|
||||
!(j_entry.count("text")) &&
|
||||
!(j_entry.count("fkind") && j_entry.count("fid"))
|
||||
) {
|
||||
std::cerr << "NGCHS2Rizzler error: msg neither contains text nor file fields\n";
|
||||
continue;
|
||||
}
|
||||
|
||||
Contact3 from_c{entt::null};
|
||||
{ // from_c
|
||||
std::vector<uint8_t> id;
|
||||
if (j_ppk.is_binary()) {
|
||||
id = j_ppk.get_binary();
|
||||
} else {
|
||||
j_ppk.at("bytes").get_to(id);
|
||||
}
|
||||
|
||||
from_c = findContactByID(_cr, id);
|
||||
|
||||
if (!_cr.valid(from_c)) {
|
||||
// create sparse contact with id only
|
||||
from_c = _cr.create();
|
||||
_cr.emplace_or_replace<Contact::Components::ID>(from_c, id);
|
||||
|
||||
// TODO: only if public message
|
||||
_cr.emplace_or_replace<Contact::Components::Parent>(from_c, sync_by_c.get<Contact::Components::Parent>().parent);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: from_c perm check
|
||||
// hard to do without numbers
|
||||
|
||||
Message3Handle new_real_msg{reg, reg.create()};
|
||||
|
||||
new_real_msg.emplace<Message::Components::Timestamp>(ts); // reactive?
|
||||
|
||||
new_real_msg.emplace<Message::Components::ContactFrom>(from_c);
|
||||
new_real_msg.emplace<Message::Components::ContactTo>(sync_by_c.get<Contact::Components::Parent>().parent);
|
||||
|
||||
new_real_msg.emplace<Message::Components::ToxGroupMessageID>(mid);
|
||||
|
||||
if (j_entry.contains("action") && static_cast<bool>(j_entry.at("action"))) {
|
||||
new_real_msg.emplace<Message::Components::TagMessageIsAction>();
|
||||
}
|
||||
|
||||
if (j_entry.contains("text")) {
|
||||
const std::string& text = j_entry.at("text");
|
||||
|
||||
new_real_msg.emplace<Message::Components::MessageText>(text);
|
||||
|
||||
#if 0
|
||||
std::cout
|
||||
<< "msg ts:" << ts
|
||||
//<< " ppk:" << j_ppk
|
||||
<< " mid:" << mid
|
||||
<< " type:" << type
|
||||
<< " text:" << text
|
||||
<< "\n"
|
||||
;
|
||||
#endif
|
||||
} else if (j_entry.contains("fkind") && j_entry.contains("fid")) {
|
||||
uint32_t fkind = j_entry.at("fkind");
|
||||
|
||||
const auto& j_fid = j_entry.at("fid");
|
||||
|
||||
std::vector<uint8_t> fid;
|
||||
if (j_fid.is_binary()) {
|
||||
fid = j_fid.get_binary();
|
||||
} else {
|
||||
j_fid.at("bytes").get_to(fid);
|
||||
}
|
||||
|
||||
if (fkind == (uint32_t)NGCFT1_file_kind::HASH_SHA1_INFO) {
|
||||
_sha1_nft.constructFileMessageInPlace(
|
||||
new_real_msg,
|
||||
NGCFT1_file_kind::HASH_SHA1_INFO,
|
||||
ByteSpan{fid}
|
||||
);
|
||||
} else {
|
||||
std::cerr << "NGCHS2Rizzler error: unknown file kind " << fkind << "\n";
|
||||
}
|
||||
|
||||
#if 0
|
||||
std::cout
|
||||
<< "msg ts:" << ts
|
||||
//<< " ppk:" << j_ppk
|
||||
<< " mid:" << mid
|
||||
<< " type:" << type
|
||||
<< " fkind:" << fkind
|
||||
<< " fid:" << j_fid
|
||||
<< "\n"
|
||||
;
|
||||
#endif
|
||||
}
|
||||
|
||||
// now check against pre existing
|
||||
// TODO: dont do this afterwards
|
||||
Message3Handle dup_msg{};
|
||||
{ // check preexisting
|
||||
// get comparator from contact
|
||||
const Contact3Handle reg_c {_cr, reg.ctx().get<Contact3>()};
|
||||
if (reg_c.all_of<Contact::Components::MessageIsSame>()) {
|
||||
auto& comp = reg_c.get<Contact::Components::MessageIsSame>().comp;
|
||||
// walking EVERY existing message OOF
|
||||
// this needs optimizing
|
||||
for (const Message3 other_msg : reg.view<Message::Components::Timestamp, Message::Components::ContactFrom, Message::Components::ContactTo>()) {
|
||||
if (other_msg == new_real_msg) {
|
||||
continue; // skip self
|
||||
}
|
||||
|
||||
if (comp({reg, other_msg}, new_real_msg)) {
|
||||
// dup
|
||||
dup_msg = {reg, other_msg};
|
||||
break;
|
||||
}
|
||||
}
|
||||
} // else, default heuristic??
|
||||
}
|
||||
|
||||
Message3Handle new_msg = new_real_msg;
|
||||
|
||||
if (dup_msg) {
|
||||
// we leak objects here (if file)
|
||||
reg.destroy(new_msg);
|
||||
new_msg = dup_msg;
|
||||
}
|
||||
|
||||
{ // by whom
|
||||
auto& synced_by = new_msg.get_or_emplace<Message::Components::SyncedBy>().ts;
|
||||
// dont overwrite
|
||||
synced_by.try_emplace(sync_by_c, now_ts);
|
||||
}
|
||||
|
||||
{ // now we also know they got the message
|
||||
auto& list = new_msg.get_or_emplace<Message::Components::ReceivedBy>().ts;
|
||||
// dont overwrite
|
||||
list.try_emplace(sync_by_c, now_ts);
|
||||
}
|
||||
|
||||
if (new_msg == dup_msg) {
|
||||
// TODO: maybe update a timestamp?
|
||||
_rmm.throwEventUpdate(reg, new_msg);
|
||||
} else {
|
||||
// pure new msg
|
||||
|
||||
new_msg.emplace<Message::Components::TimestampProcessed>(now_ts);
|
||||
new_msg.emplace<Message::Components::TimestampWritten>(ts);
|
||||
|
||||
new_msg.emplace<Message::Components::TagUnread>();
|
||||
_rmm.throwEventConstruct(reg, new_msg);
|
||||
}
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2Rizzler error: parsing entry '" << j_entry.dump() << "'\n";
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2Rizzler error: failed parsing data as msgpack\n";
|
||||
}
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
if (e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) {
|
||||
return false; // not for us
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Rizzler: recv_init " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
|
||||
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (!c) {
|
||||
return false; // huh?
|
||||
}
|
||||
|
||||
if (!c.all_of<Components::RequestedChatLogs>()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// parse start end
|
||||
// TODO: extract
|
||||
ByteSpan fid{e.file_id, e.file_id_size};
|
||||
// TODO: better size check
|
||||
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
|
||||
std::cerr << "NGCHS2S error: range not lange enough\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
// seconds
|
||||
uint64_t ts_start{0};
|
||||
uint64_t ts_end{0};
|
||||
|
||||
// parse
|
||||
try {
|
||||
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
|
||||
ts_start = deserlTS(ts_start_bytes);
|
||||
|
||||
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
|
||||
ts_end = deserlTS(ts_end_bytes);
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2R error: failed to parse range\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
if (ts_end >= ts_start) {
|
||||
std::cerr << "NGCHS2R error: end not < start\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
auto& reqcl = c.get<Components::RequestedChatLogs>();
|
||||
|
||||
if (!reqcl.contains(ts_start, ts_end)) {
|
||||
// warn?
|
||||
return true;
|
||||
}
|
||||
|
||||
auto& rnncl = c.get_or_emplace<Components::RunningChatLogs>();
|
||||
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
|
||||
|
||||
auto& transfer = rnncl.list[e.transfer_id];
|
||||
transfer.data.reserve(e.file_size); // danger?
|
||||
transfer.last_activity = 0.f;
|
||||
transfer.ts_start = ts_start;
|
||||
transfer.ts_end = ts_end;
|
||||
|
||||
e.accept = true;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (!c) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!c.all_of<Components::RunningChatLogs>()) {
|
||||
return false; // not ours
|
||||
}
|
||||
|
||||
auto& rnncl = c.get<Components::RunningChatLogs>();
|
||||
if (!rnncl.list.count(e.transfer_id)) {
|
||||
return false; // not ours
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Rizzler: recv_data " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << " " << e.data_size << "@" << e.data_offset << "\n";
|
||||
|
||||
auto& transfer = rnncl.list.at(e.transfer_id);
|
||||
transfer.data.resize(e.data_offset+e.data_size);
|
||||
std::memcpy(&transfer.data[e.data_offset], e.data, e.data_size);
|
||||
|
||||
transfer.last_activity = 0.f;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
// FIXME: this does not work, tcm just delteded the relation ship
|
||||
//auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
//if (!c) {
|
||||
// return false;
|
||||
//}
|
||||
const auto c_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
|
||||
if (c_it == _tox_peer_to_contact.end()) {
|
||||
return false;
|
||||
}
|
||||
auto c = c_it->second;
|
||||
if (!static_cast<bool>(c)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!c.all_of<Components::RunningChatLogs>()) {
|
||||
return false; // not ours
|
||||
}
|
||||
|
||||
auto& rnncl = c.get<Components::RunningChatLogs>();
|
||||
if (!rnncl.list.count(e.transfer_id)) {
|
||||
return false; // not ours
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Rizzler: recv_done " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
|
||||
{
|
||||
auto& transfer = rnncl.list.at(e.transfer_id);
|
||||
// TODO: done might mean failed, so we might be parsing bs here
|
||||
|
||||
// use data
|
||||
// TODO: move out of packet handler
|
||||
handleMsgPack(c, transfer.data);
|
||||
}
|
||||
rnncl.list.erase(e.transfer_id);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
|
||||
const auto group_number = tox_event_group_peer_join_get_group_number(e);
|
||||
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
|
||||
|
||||
if (!c) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!_request_queue.count(c)) {
|
||||
_request_queue[c] = {
|
||||
_delay_before_first_request_min + _rng_dist(_rng)*_delay_before_first_request_add,
|
||||
0.f,
|
||||
0,
|
||||
};
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
73
solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp
Normal file
73
solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp
Normal file
@ -0,0 +1,73 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp>
|
||||
|
||||
// fwd
|
||||
class ToxContactModel2;
|
||||
class RegistryMessageModelI;
|
||||
|
||||
|
||||
class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
|
||||
Contact3Registry& _cr;
|
||||
RegistryMessageModelI& _rmm;
|
||||
ToxContactModel2& _tcm;
|
||||
NGCFT1& _nft;
|
||||
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
||||
ToxEventProviderI::SubscriptionReference _tep_sr;
|
||||
SHA1_NGCFT1& _sha1_nft;
|
||||
|
||||
// 5s-6s
|
||||
const float _delay_before_first_request_min {5.f};
|
||||
const float _delay_before_first_request_add {1.f};
|
||||
|
||||
std::uniform_real_distribution<float> _rng_dist {0.0f, 1.0f};
|
||||
std::minstd_rand _rng;
|
||||
|
||||
struct RequestQueueInfo {
|
||||
float delay; // const
|
||||
float timer;
|
||||
uint64_t sync_delta; //?
|
||||
};
|
||||
// request queue
|
||||
// c -> delay, timer
|
||||
std::map<Contact3, RequestQueueInfo> _request_queue;
|
||||
|
||||
// FIXME: workaround missing contact events
|
||||
// only used on peer exit (no, also used to quicken lookups)
|
||||
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
||||
|
||||
public:
|
||||
NGCHS2Rizzler(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
NGCFT1& nft,
|
||||
ToxEventProviderI& tep,
|
||||
SHA1_NGCFT1& sha1_nft
|
||||
);
|
||||
|
||||
~NGCHS2Rizzler(void);
|
||||
|
||||
float iterate(float delta);
|
||||
|
||||
protected:
|
||||
bool sendRequest(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint64_t ts_start, uint64_t ts_end
|
||||
);
|
||||
|
||||
void handleMsgPack(Contact3Handle c, const std::vector<uint8_t>& data);
|
||||
|
||||
protected:
|
||||
bool onEvent(const Events::NGCFT1_recv_init&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_done&) override;
|
||||
|
||||
protected:
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
|
||||
};
|
||||
|
460
solanaceae/ngc_hs2/ngc_hs2_sigma.cpp
Normal file
460
solanaceae/ngc_hs2/ngc_hs2_sigma.cpp
Normal file
@ -0,0 +1,460 @@
|
||||
#include "./ngc_hs2_sigma.hpp"
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
|
||||
#include <solanaceae/contact/components.hpp>
|
||||
#include <solanaceae/tox_contacts/components.hpp>
|
||||
#include <solanaceae/message3/components.hpp>
|
||||
#include <solanaceae/tox_messages/msg_components.hpp>
|
||||
|
||||
//#include <solanaceae/tox_messages/obj_components.hpp>
|
||||
// TODO: this is kinda bad, needs improvement
|
||||
// use tox fileid/filekind instead !
|
||||
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
|
||||
#include <solanaceae/ngc_ft1_sha1/components.hpp>
|
||||
|
||||
// TODO: move somewhere else?
|
||||
#include <solanaceae/ngc_ft1_sha1/util.hpp>
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#include "./serl.hpp"
|
||||
|
||||
#include "./ts_find_start.hpp"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
// https://www.youtube.com/watch?v=AdAqsgga3qo
|
||||
|
||||
// TODO: move to own file
|
||||
namespace Components {
|
||||
|
||||
struct IncommingTimeRangeRequestQueue {
|
||||
struct Entry {
|
||||
TimeRangeRequest ir;
|
||||
std::vector<uint8_t> fid;
|
||||
};
|
||||
std::deque<Entry> _queue;
|
||||
|
||||
// we should remove/notadd queued requests
|
||||
// that are subsets of same or larger ranges
|
||||
void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid);
|
||||
};
|
||||
|
||||
struct IncommingTimeRangeRequestRunning {
|
||||
struct Entry {
|
||||
TimeRangeRequest ir;
|
||||
std::vector<uint8_t> data; // transfer data in memory
|
||||
float last_activity {0.f};
|
||||
};
|
||||
entt::dense_map<uint8_t, Entry> _list;
|
||||
};
|
||||
|
||||
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) {
|
||||
// TODO: do more than exact dedupe
|
||||
for (const auto& [time_range, _] : _queue) {
|
||||
if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) {
|
||||
return; // already enqueued
|
||||
// TODO: what about fid?
|
||||
}
|
||||
}
|
||||
|
||||
_queue.emplace_back(Entry{
|
||||
new_request,
|
||||
std::vector<uint8_t>{fid.cbegin(), fid.cend()}
|
||||
});
|
||||
}
|
||||
|
||||
} // Components
|
||||
|
||||
|
||||
NGCHS2Sigma::NGCHS2Sigma(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
NGCFT1& nft
|
||||
) :
|
||||
_cr(cr),
|
||||
_rmm(rmm),
|
||||
_tcm(tcm),
|
||||
_nft(nft),
|
||||
_nftep_sr(_nft.newSubRef(this))
|
||||
{
|
||||
_nftep_sr
|
||||
.subscribe(NGCFT1_Event::recv_request)
|
||||
.subscribe(NGCFT1_Event::send_data)
|
||||
.subscribe(NGCFT1_Event::send_done)
|
||||
;
|
||||
}
|
||||
|
||||
NGCHS2Sigma::~NGCHS2Sigma(void) {
|
||||
}
|
||||
|
||||
float NGCHS2Sigma::iterate(float delta) {
|
||||
// limit how often we update here (new fts usually)
|
||||
if (_iterate_heat > 0.f) {
|
||||
_iterate_heat -= delta;
|
||||
return 1000.f; // return heat?
|
||||
} else {
|
||||
_iterate_heat = _iterate_cooldown;
|
||||
}
|
||||
|
||||
// work request queue
|
||||
// check if already running, discard
|
||||
|
||||
auto fn_iirq = [this](auto&& view) {
|
||||
for (auto&& [cv, iirq] : view.each()) {
|
||||
if (iirq._queue.empty()) {
|
||||
// TODO: remove comp?
|
||||
continue;
|
||||
}
|
||||
|
||||
Contact3Handle c{_cr, cv};
|
||||
auto& iirr = c.get_or_emplace<Components::IncommingTimeRangeRequestRunning>();
|
||||
|
||||
// dedup queued from running
|
||||
|
||||
if (iirr._list.size() >= _max_parallel_per_peer) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// new ft here
|
||||
// TODO: loop? nah just 1 per tick is enough
|
||||
const auto request_entry = iirq._queue.front(); // copy
|
||||
assert(!request_entry.fid.empty());
|
||||
|
||||
if (!c.all_of<Contact::Components::Parent>()) {
|
||||
iirq._queue.pop_front();
|
||||
continue; // how
|
||||
}
|
||||
const Contact3Handle group_c = {*c.registry(), c.get<Contact::Components::Parent>().parent};
|
||||
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
|
||||
iirq._queue.pop_front();
|
||||
continue;
|
||||
}
|
||||
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
|
||||
_tox_peer_to_contact[combine_ids(group_number, peer_number)] = c; // cache
|
||||
|
||||
// TODO: check allowed range here
|
||||
//_max_time_into_past_default
|
||||
|
||||
// potentially heavy op
|
||||
auto data = buildChatLogFileRange(group_c, request_entry.ir.ts_start, request_entry.ir.ts_end);
|
||||
|
||||
uint8_t transfer_id {0};
|
||||
if (!_nft.NGC_FT1_send_init_private(
|
||||
group_number, peer_number,
|
||||
(uint32_t)NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK,
|
||||
request_entry.fid.data(), request_entry.fid.size(),
|
||||
data.size(),
|
||||
&transfer_id,
|
||||
true // can_compress (does nothing rn)
|
||||
)) {
|
||||
// sending failed, we do not pop but wait for next iterate
|
||||
// TODO: cache data
|
||||
// TODO: fail counter
|
||||
// actually, fail probably means offline, so delete?
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(iirr._list.count(transfer_id) == 0);
|
||||
iirr._list[transfer_id] = {request_entry.ir, data};
|
||||
|
||||
iirq._queue.pop_front();
|
||||
}
|
||||
};
|
||||
|
||||
// first handle range requests on weak self
|
||||
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue, Contact::Components::TagSelfWeak>());
|
||||
|
||||
// we could stop here, if too much is already running
|
||||
|
||||
// then range on others
|
||||
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
|
||||
|
||||
_cr.view<Components::IncommingTimeRangeRequestRunning>().each(
|
||||
[delta](const auto cv, Components::IncommingTimeRangeRequestRunning& irr) {
|
||||
std::vector<uint8_t> to_remove;
|
||||
for (auto&& [ft_id, entry] : irr._list) {
|
||||
entry.last_activity += delta;
|
||||
if (entry.last_activity >= 60.f) {
|
||||
to_remove.push_back(ft_id);
|
||||
}
|
||||
}
|
||||
for (const auto it : to_remove) {
|
||||
std::cout << "NGCHS2Sigma warning: timed out ." << (int)it << "\n";
|
||||
// TODO: need a way to tell ft?
|
||||
irr._list.erase(it);
|
||||
// technically we are not supposed to timeout and instead rely on the done event
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return 1000.f;
|
||||
}
|
||||
|
||||
void NGCHS2Sigma::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
|
||||
ByteSpan fid{e.file_id, e.file_id_size};
|
||||
// TODO: better size check
|
||||
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
|
||||
std::cerr << "NGCHS2S error: range not lange enough\n";
|
||||
return;
|
||||
}
|
||||
|
||||
// seconds
|
||||
uint64_t ts_start{0};
|
||||
uint64_t ts_end{0};
|
||||
|
||||
// parse
|
||||
try {
|
||||
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
|
||||
ts_start = deserlTS(ts_start_bytes);
|
||||
|
||||
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
|
||||
ts_end = deserlTS(ts_end_bytes);
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2S error: failed to parse range\n";
|
||||
return;
|
||||
}
|
||||
|
||||
if (ts_end >= ts_start) {
|
||||
std::cerr << "NGCHS2S error: end not < start\n";
|
||||
return;
|
||||
}
|
||||
|
||||
// dedupe insert into queue
|
||||
// how much overlap do we allow?
|
||||
c.get_or_emplace<Components::IncommingTimeRangeRequestQueue>().queueRequest(
|
||||
{ts_start, ts_end},
|
||||
fid
|
||||
);
|
||||
}
|
||||
|
||||
std::vector<uint8_t> NGCHS2Sigma::buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) {
|
||||
const Message3Registry* reg_ptr = static_cast<const RegistryMessageModelI&>(_rmm).get(c);
|
||||
if (reg_ptr == nullptr) {
|
||||
return {};
|
||||
}
|
||||
const Message3Registry& msg_reg = *reg_ptr;
|
||||
|
||||
|
||||
if (msg_reg.storage<Message::Components::Timestamp>() == nullptr) {
|
||||
// nothing to do here
|
||||
return {};
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Sigma: building chatlog for time range " << ts_start-ts_end << "s\n";
|
||||
|
||||
// convert seconds to milliseconds
|
||||
// TODO: lift out?
|
||||
ts_start *= 1000;
|
||||
ts_end *= 1000;
|
||||
|
||||
//std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << " ts_end:" << ts_end << "\n";
|
||||
|
||||
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
|
||||
|
||||
// we iterate "forward", so from newest to oldest
|
||||
|
||||
// start is the newest ts
|
||||
const auto ts_start_it = find_start_by_ts(ts_view, ts_start);
|
||||
// end is the oldest ts
|
||||
|
||||
// we only search for the start point, because we walk to the end anyway
|
||||
|
||||
auto j_array = nlohmann::json::array_t{};
|
||||
|
||||
// hmm
|
||||
// maybe use other view or something?
|
||||
for (auto it = ts_start_it; it != ts_view.end(); it++) {
|
||||
const auto e = *it;
|
||||
const auto& [ts_comp] = ts_view.get(e);
|
||||
|
||||
if (ts_comp.ts > ts_start) {
|
||||
std::cerr << "!!!! msg ent in view too new\n";
|
||||
continue;
|
||||
} else if (ts_comp.ts < ts_end) {
|
||||
// too old, we hit the end of the range
|
||||
break;
|
||||
}
|
||||
|
||||
if (!msg_reg.all_of<
|
||||
Message::Components::ContactFrom,
|
||||
Message::Components::ContactTo,
|
||||
Message::Components::ToxGroupMessageID
|
||||
>(e)) {
|
||||
continue; // ??
|
||||
}
|
||||
if (!msg_reg.any_of<Message::Components::MessageText, Message::Components::MessageFileObject>(e)) {
|
||||
continue; // skip
|
||||
}
|
||||
|
||||
const auto& [c_from_c, c_to_c] = msg_reg.get<Message::Components::ContactFrom, Message::Components::ContactTo>(e);
|
||||
|
||||
if (c_to_c.c != c) {
|
||||
// message was not public
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!_cr.valid(c_from_c.c)) {
|
||||
continue; // ???
|
||||
}
|
||||
|
||||
Contact3Handle c_from{_cr, c_from_c.c};
|
||||
|
||||
if (!c_from.all_of<Contact::Components::ToxGroupPeerPersistent>()) {
|
||||
continue; // ???
|
||||
}
|
||||
|
||||
if (_only_send_self_observed && msg_reg.all_of<Message::Components::SyncedBy>(e) && c.all_of<Contact::Components::Self>()) {
|
||||
if (!msg_reg.get<Message::Components::SyncedBy>(e).ts.count(c.get<Contact::Components::Self>().self)) {
|
||||
continue; // did not observe ourselfs, skip
|
||||
}
|
||||
}
|
||||
|
||||
auto j_entry = nlohmann::json::object_t{};
|
||||
|
||||
j_entry["ts"] = ts_comp.ts/100; // millisec -> decisec
|
||||
{
|
||||
const auto& ppk_ref = c_from.get<Contact::Components::ToxGroupPeerPersistent>().peer_key.data;
|
||||
j_entry["ppk"] = nlohmann::json::binary_t{std::vector<uint8_t>{ppk_ref.cbegin(), ppk_ref.cend()}};
|
||||
}
|
||||
j_entry["mid"] = msg_reg.get<Message::Components::ToxGroupMessageID>(e).id;
|
||||
|
||||
if (msg_reg.all_of<Message::Components::TagMessageIsAction>(e)) {
|
||||
j_entry["action"] = true;
|
||||
}
|
||||
|
||||
if (msg_reg.all_of<Message::Components::MessageText>(e)) {
|
||||
j_entry["text"] = msg_reg.get<Message::Components::MessageText>(e).text;
|
||||
} else if (msg_reg.any_of<Message::Components::MessageFileObject>(e)) {
|
||||
const auto& o = msg_reg.get<Message::Components::MessageFileObject>(e).o;
|
||||
if (!o) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// HACK: use tox fild_id and file_kind instead!!
|
||||
if (o.all_of<Components::FT1InfoSHA1Hash>()) {
|
||||
j_entry["fkind"] = NGCFT1_file_kind::HASH_SHA1_INFO;
|
||||
j_entry["fid"] = nlohmann::json::binary_t{o.get<Components::FT1InfoSHA1Hash>().hash};
|
||||
} else {
|
||||
continue; // unknown file type
|
||||
}
|
||||
}
|
||||
|
||||
j_array.push_back(j_entry);
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Sigma: built chat log with " << j_array.size() << " entries\n";
|
||||
|
||||
return nlohmann::json::to_msgpack(j_array);
|
||||
}
|
||||
|
||||
bool NGCHS2Sigma::onEvent(const Message::Events::MessageConstruct&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Sigma::onEvent(const Message::Events::MessageUpdated&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Sigma::onEvent(const Message::Events::MessageDestory&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
if (
|
||||
e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK
|
||||
) {
|
||||
return false; // not for us
|
||||
}
|
||||
|
||||
// TODO: when is it done from queue?
|
||||
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (!c) {
|
||||
return false; // how
|
||||
}
|
||||
|
||||
// is other peer allowed to make requests
|
||||
//bool quick_allow {false};
|
||||
bool quick_allow {true}; // HACK: disable all restrictions for this early test
|
||||
// TODO: quick deny?
|
||||
{
|
||||
// - tagged as weakself
|
||||
if (!quick_allow && c.all_of<Contact::Components::TagSelfWeak>()) {
|
||||
quick_allow = true;
|
||||
}
|
||||
|
||||
// - sub perm level??
|
||||
// - out of max time range (ft specific, not a quick_allow)
|
||||
}
|
||||
|
||||
if (e.file_kind == NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) {
|
||||
handleTimeRange(c, e);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (!c) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!c.all_of<Components::IncommingTimeRangeRequestRunning>()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& irr = c.get<Components::IncommingTimeRangeRequestRunning>();
|
||||
if (!irr._list.count(e.transfer_id)) {
|
||||
return false; // not for us (maybe)
|
||||
}
|
||||
|
||||
auto& transfer = irr._list.at(e.transfer_id);
|
||||
if (transfer.data.size() < e.data_offset+e.data_size) {
|
||||
std::cerr << "NGCHS2Sigma error: ft send data larger then file???\n";
|
||||
assert(false && "how");
|
||||
}
|
||||
std::memcpy(e.data, transfer.data.data()+e.data_offset, e.data_size);
|
||||
transfer.last_activity = 0.f;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_done& e) {
|
||||
// TODO: this will return null if the peer just disconnected
|
||||
// FIXME: this does not work, tcm just delteded the relation ship
|
||||
//auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
//if (!c) {
|
||||
// return false;
|
||||
//}
|
||||
const auto c_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
|
||||
if (c_it == _tox_peer_to_contact.end()) {
|
||||
return false;
|
||||
}
|
||||
auto c = c_it->second;
|
||||
if (!static_cast<bool>(c)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!c.all_of<Components::IncommingTimeRangeRequestRunning>()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto& irr = c.get<Components::IncommingTimeRangeRequestRunning>();
|
||||
if (!irr._list.count(e.transfer_id)) {
|
||||
return false; // not for us (maybe)
|
||||
}
|
||||
|
||||
irr._list.erase(e.transfer_id);
|
||||
|
||||
// TODO: check if we completed it
|
||||
std::cout << "NGCHS2Sigma: sent chatlog to " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
|
||||
|
||||
return true;
|
||||
}
|
||||
|
82
solanaceae/ngc_hs2/ngc_hs2_sigma.hpp
Normal file
82
solanaceae/ngc_hs2/ngc_hs2_sigma.hpp
Normal file
@ -0,0 +1,82 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
|
||||
#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/message3/registry_message_model.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
|
||||
#include <entt/container/dense_map.hpp>
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <vector>
|
||||
#include <deque>
|
||||
|
||||
// fwd
|
||||
class ToxContactModel2;
|
||||
|
||||
|
||||
struct TimeRangeRequest {
|
||||
uint64_t ts_start{0};
|
||||
uint64_t ts_end{0};
|
||||
};
|
||||
|
||||
class NGCHS2Sigma : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
Contact3Registry& _cr;
|
||||
RegistryMessageModelI& _rmm;
|
||||
ToxContactModel2& _tcm;
|
||||
NGCFT1& _nft;
|
||||
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
||||
|
||||
float _iterate_heat {0.f};
|
||||
constexpr static float _iterate_cooldown {1.22f}; // sec
|
||||
|
||||
// open/running range requests (by c)
|
||||
// comp on peer c
|
||||
|
||||
// open/running range responses (by c)
|
||||
// comp on peer c
|
||||
|
||||
// limit to 2 uploads per peer simultaniously
|
||||
// TODO: increase for prod (4?) or maybe even lower?
|
||||
// currently per type
|
||||
constexpr static size_t _max_parallel_per_peer {2};
|
||||
|
||||
constexpr static bool _only_send_self_observed {true};
|
||||
constexpr static int64_t _max_time_into_past_default {60*15}; // s
|
||||
|
||||
// FIXME: workaround missing contact events
|
||||
// only used on peer exit (no, also used to quicken lookups)
|
||||
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
||||
|
||||
public:
|
||||
NGCHS2Sigma(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
NGCFT1& nft
|
||||
);
|
||||
|
||||
~NGCHS2Sigma(void);
|
||||
|
||||
float iterate(float delta);
|
||||
|
||||
void handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request&);
|
||||
|
||||
// msg reg contact
|
||||
// time ranges
|
||||
[[nodiscard]] std::vector<uint8_t> buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end);
|
||||
|
||||
protected:
|
||||
bool onEvent(const Message::Events::MessageConstruct&) override;
|
||||
bool onEvent(const Message::Events::MessageUpdated&) override;
|
||||
bool onEvent(const Message::Events::MessageDestory&) override;
|
||||
|
||||
protected:
|
||||
bool onEvent(const Events::NGCFT1_recv_request&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_done&) override;
|
||||
};
|
||||
|
32
solanaceae/ngc_hs2/serl.hpp
Normal file
32
solanaceae/ngc_hs2/serl.hpp
Normal file
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
template<typename Type>
|
||||
static uint64_t deserlSimpleType(ByteSpan bytes) {
|
||||
if (bytes.size < sizeof(Type)) {
|
||||
throw int(1);
|
||||
}
|
||||
|
||||
Type value{};
|
||||
|
||||
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||
value |= Type(bytes[i]) << (i*8);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
static uint64_t deserlTS(ByteSpan ts_bytes) {
|
||||
return deserlSimpleType<uint64_t>(ts_bytes);
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
static void serlSimpleType(std::vector<uint8_t>& bytes, const Type& value) {
|
||||
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||
bytes.push_back(uint8_t(value >> (i*8) & 0xff));
|
||||
}
|
||||
}
|
||||
|
110
solanaceae/ngc_hs2/spec_ngc_hs2.md
Normal file
110
solanaceae/ngc_hs2/spec_ngc_hs2.md
Normal file
@ -0,0 +1,110 @@
|
||||
# [NGC] Group-History-Sync (v2.1) [PoC] [Draft]
|
||||
|
||||
Simple group history sync that uses `timestamp` + `peer public key` + `message_id` (`ts+ppk+mid`) to, mostly, uniquely identify messages and deliver them.
|
||||
|
||||
Messages are bundled up in a `msgpack` `array` and sent as a file transfer.
|
||||
|
||||
## Requirements
|
||||
|
||||
TODO: more?
|
||||
|
||||
### Msgpack
|
||||
|
||||
For serializing the messages.
|
||||
|
||||
### File transfers
|
||||
|
||||
For sending packs of messages.
|
||||
Even a single message can be larger than a single custom packet, so this is a must-have.
|
||||
This also allows for compression down the road.
|
||||
|
||||
## Procedure
|
||||
|
||||
Peer A can request `ts+ppk+mid+msg` list for a given time range from peer B.
|
||||
|
||||
Peer B then sends a filetransfer (with special file type) of list of `ts+ppk+mid+msg`.
|
||||
Optionally compressed. (Delta-coding? / zstd)
|
||||
|
||||
Peer A keeps doing that until the desired time span is covered.
|
||||
|
||||
During all that, peer B usually does the same thing to peer A.
|
||||
|
||||
TODO: deny request explicitly. also why (like perms and time range too large etc)
|
||||
|
||||
## Traffic savings
|
||||
|
||||
It is recomended to remember if a range has been requested and answered from a given peer, to reduce traffic.
|
||||
|
||||
While compression is optional, it is recommended.
|
||||
Timestamps fit delta coding.
|
||||
Peer keys fit dicts.
|
||||
Message ids are mostly high entropy.
|
||||
The Message itself is text, so dict/huffman fits well.
|
||||
|
||||
TODO: store the 4 coloms SoA instead of AoS ?
|
||||
|
||||
## Message uniqueness
|
||||
|
||||
This protocol relies on the randomness of `message_id` and the clocks to be more or less synchronized.
|
||||
|
||||
However, `message_id` can be manipulated freely by any peer, this can make messages appear as duplicates.
|
||||
|
||||
This can be used here, if you don't wish your messages to be syncronized (to an extent).
|
||||
|
||||
## Security
|
||||
|
||||
Only sync publicly sent/recieved messages.
|
||||
|
||||
Only allow sync or extended time ranges from peers you trust (enough).
|
||||
|
||||
The default shall be to not offer any messages.
|
||||
|
||||
Indirect messages shall be low in credibility, while direct synced (by author), with mid credibility.
|
||||
|
||||
Either only high or mid credibility shall be sent.
|
||||
|
||||
|
||||
Manual exceptions to all can be made at the users discretion, eg for other self owned devices.
|
||||
|
||||
## File transfer requests
|
||||
|
||||
TODO: is reusing the ft request api a good idea for this?
|
||||
|
||||
| fttype | name | content (ft id) |
|
||||
|------------|------|---------------------|
|
||||
| 0x00000f02 | time range msgpack | - ts start </br> - ts end |
|
||||
|
||||
## File transfer content
|
||||
|
||||
| fttype | name | content | note |
|
||||
|------------|------|----------------------------|---|
|
||||
| 0x00000f02 | time range msgpack | `message list` in msgpack | |
|
||||
|
||||
### time range msgpack
|
||||
|
||||
Msgpack array of messages.
|
||||
|
||||
```
|
||||
name | type/size | note
|
||||
-------------------------|-------------------|-----
|
||||
- array | 32bit number msgs
|
||||
- ts | 64bit deciseconds
|
||||
- ppk | 32bytes
|
||||
- mid | 16bit
|
||||
- if action |
|
||||
- action | bool
|
||||
- if text |
|
||||
- text | string | maybe byte array instead?
|
||||
- if file |
|
||||
- fkind | 32bit enum | is this right?
|
||||
- fid | bytes kind | length depends on kind
|
||||
```
|
||||
|
||||
Name is the actual string key.
|
||||
Data type sizes are suggestions, if not defined by the tox protocol.
|
||||
|
||||
## TODO
|
||||
|
||||
- [ ] figure out a pro-active approach (instead of waiting for a range request)
|
||||
- [ ] compression in the ft layer? (would make it reusable) hint/autodetect/autoenable for >1k ?
|
||||
|
82
solanaceae/ngc_hs2/test_ts_binarysearch.cpp
Normal file
82
solanaceae/ngc_hs2/test_ts_binarysearch.cpp
Normal file
@ -0,0 +1,82 @@
|
||||
#include "./ts_find_start.hpp"
|
||||
|
||||
#include <solanaceae/message3/registry_message_model.hpp>
|
||||
#include <solanaceae/message3/components.hpp>
|
||||
|
||||
#include <cassert>
|
||||
|
||||
int main(void) {
|
||||
Message3Registry msg_reg;
|
||||
|
||||
{
|
||||
std::cout << "TEST empty reg\n";
|
||||
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
|
||||
const auto res = find_start_by_ts(ts_view, 42);
|
||||
assert(res == ts_view.end());
|
||||
}
|
||||
|
||||
{
|
||||
std::cout << "TEST single msg newer (fail)\n";
|
||||
Message3Handle msg{msg_reg, msg_reg.create()};
|
||||
msg.emplace<Message::Components::Timestamp>(43ul);
|
||||
|
||||
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
|
||||
const auto res = find_start_by_ts(ts_view, 42);
|
||||
assert(res == ts_view.end());
|
||||
|
||||
msg.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
std::cout << "TEST single msg same (succ)\n";
|
||||
Message3Handle msg{msg_reg, msg_reg.create()};
|
||||
msg.emplace<Message::Components::Timestamp>(42ul);
|
||||
|
||||
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
|
||||
const auto res = find_start_by_ts(ts_view, 42);
|
||||
assert(res != ts_view.end());
|
||||
|
||||
msg.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
std::cout << "TEST single msg older (succ)\n";
|
||||
Message3Handle msg{msg_reg, msg_reg.create()};
|
||||
msg.emplace<Message::Components::Timestamp>(41ul);
|
||||
|
||||
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
|
||||
const auto res = find_start_by_ts(ts_view, 42);
|
||||
assert(res != ts_view.end());
|
||||
|
||||
msg.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
std::cout << "TEST multi msg\n";
|
||||
Message3Handle msg{msg_reg, msg_reg.create()};
|
||||
msg.emplace<Message::Components::Timestamp>(41ul);
|
||||
Message3Handle msg2{msg_reg, msg_reg.create()};
|
||||
msg2.emplace<Message::Components::Timestamp>(42ul);
|
||||
Message3Handle msg3{msg_reg, msg_reg.create()};
|
||||
msg3.emplace<Message::Components::Timestamp>(43ul);
|
||||
|
||||
// see message3/message_time_sort.cpp
|
||||
msg_reg.sort<Message::Components::Timestamp>([](const auto& lhs, const auto& rhs) -> bool {
|
||||
return lhs.ts > rhs.ts;
|
||||
}, entt::insertion_sort{});
|
||||
|
||||
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
|
||||
auto res = find_start_by_ts(ts_view, 42);
|
||||
assert(res != ts_view.end());
|
||||
assert(*res == msg2);
|
||||
res++;
|
||||
assert(*res == msg);
|
||||
|
||||
msg3.destroy();
|
||||
msg2.destroy();
|
||||
msg.destroy();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
31
solanaceae/ngc_hs2/ts_find_start.hpp
Normal file
31
solanaceae/ngc_hs2/ts_find_start.hpp
Normal file
@ -0,0 +1,31 @@
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstdint>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
// perform binary search to find the first message not newer than ts_start
|
||||
template<typename View>
|
||||
auto find_start_by_ts(const View& view, uint64_t ts_start) {
|
||||
//std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << "\n";
|
||||
|
||||
// -> first value smaller than start ts
|
||||
auto res = std::lower_bound(
|
||||
view.begin(), view.end(),
|
||||
ts_start,
|
||||
[&view](const auto& a, const auto& b) {
|
||||
const auto& [a_comp] = view.get(a);
|
||||
return a_comp.ts > b; // > bc ts is sorted high to low?
|
||||
}
|
||||
);
|
||||
|
||||
if (res != view.end()) {
|
||||
const auto& [ts_comp] = view.get(*res);
|
||||
//std::cout << "!!!! first value not newer than start ts is " << ts_comp.ts << "\n";
|
||||
} else {
|
||||
//std::cout << "!!!! no first value not newer than start ts\n";
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user