27 Commits

Author SHA1 Message Date
0ad4c4997c limit received messages to 1min into the future 2025-04-12 23:00:32 +02:00
8673c63d3c close rw on completion 2025-03-16 11:57:52 +01:00
a094a3d574 move to read only memory mapped files 2025-03-15 19:54:45 +01:00
f5f7f2ca9d close files of inactive tranfsers 2025-03-14 13:50:30 +01:00
246587e30d make contact store version visible 2025-03-10 20:54:00 +01:00
dacda24390 port to contact4 (not using events yet) 2025-03-10 15:09:45 +01:00
7a54552bd2 print mapping error message 2025-03-03 21:08:49 +01:00
1780bd097a error checks around performing the resize
this throws on windows and causes undefined mappings on linux
2025-01-24 00:27:58 +01:00
db73c90e34 small optimizations and os backend refactor 2025-01-18 01:44:46 +01:00
a9ebaa2c2f improve perf (was ~.7% with asan enabled) 2025-01-09 01:53:17 +01:00
0064dde1ce limit provided history by first seen 2025-01-07 22:29:39 +01:00
b3b8b79a65 time moved to util 2025-01-07 16:10:28 +01:00
7cd68845ca forgot to reset timer on recv data 2024-12-19 17:17:10 +01:00
f40907d42a use actual activity for receiving transfers for iterate interval 2024-12-19 13:33:30 +01:00
6d7d643207 adjust flow to iterate speed
higher ft1sha1 iterate interval (not fixing it yet)
2024-12-16 12:52:33 +01:00
b35babe3f8 switch to os provided remote have set and other fixes 2024-12-15 01:23:08 +01:00
78390dd342 update object, object update lock and rare crash 2024-12-13 01:30:03 +01:00
eb169b2779 add participation and log spam fix 2024-12-11 23:23:05 +01:00
100279a483 fixes 2024-12-10 17:57:53 +01:00
60b3d5d941 big ft fixes, mostly for info, but also other stuff 2024-12-10 17:18:28 +01:00
6ad2905e07 hs2: change msgpack format and fixes 2024-12-09 23:38:07 +01:00
930c829031 rizzler working, more fixes everywhere
there still are some crashes that needs workarounds
2024-12-09 22:58:36 +01:00
a139f412b1 various fixes resulting in the first time running the code 2024-12-08 16:08:30 +01:00
abdf6672bf hs2 plug, rizzler still non functional 2024-12-08 14:48:24 +01:00
7bbaa9b929 move in plugin 2024-12-08 13:17:22 +01:00
70620a901b starting the work on hs2 rizzler 2024-12-07 23:34:19 +01:00
231928339e sigma 2024-12-07 11:38:31 +01:00
38 changed files with 1624 additions and 559 deletions

View File

@ -1,9 +1,21 @@
cmake_minimum_required(VERSION 3.9 FATAL_ERROR)
cmake_minimum_required(VERSION 3.24 FATAL_ERROR)
add_subdirectory(./external)
project(solanaceae)
if (CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR)
set(SOLANACEAE_NGCFT1_STANDALONE ON)
else()
set(SOLANACEAE_NGCFT1_STANDALONE OFF)
endif()
message("II SOLANACEAE_NGCFT1_STANDALONE " ${SOLANACEAE_NGCFT1_STANDALONE})
option(SOLANACEAE_NGCFT1_BUILD_PLUGINS "Build the solanaceae_ngcft1 plugins" ${SOLANACEAE_NGCFT1_BUILD_PLUGINS})
# TODO: move this stuff to src
########################################
add_library(solanaceae_ngcext
./solanaceae/ngc_ext/ngcext.hpp
./solanaceae/ngc_ext/ngcext.cpp
@ -46,7 +58,7 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC
add_library(solanaceae_sha1_ngcft1
# hacky deps
./solanaceae/ngc_ft1_sha1/mio.hpp
./solanaceae/ngc_ft1_sha1/file_rw_mapped.hpp
./solanaceae/ngc_ft1_sha1/file2_mapped.hpp
./solanaceae/ngc_ft1_sha1/file_constructor.hpp
./solanaceae/ngc_ft1_sha1/file_constructor.cpp
@ -72,6 +84,9 @@ add_library(solanaceae_sha1_ngcft1
./solanaceae/ngc_ft1_sha1/participation.hpp
./solanaceae/ngc_ft1_sha1/participation.cpp
./solanaceae/ngc_ft1_sha1/file_inactivity_system.hpp
./solanaceae/ngc_ft1_sha1/file_inactivity_system.cpp
./solanaceae/ngc_ft1_sha1/re_announce_systems.hpp
./solanaceae/ngc_ft1_sha1/re_announce_systems.cpp
@ -122,13 +137,15 @@ endif()
########################################
add_library(solanaceae_ngchs2
./solanaceae/ngc_hs2/serl.hpp
./solanaceae/ngc_hs2/ts_find_start.hpp
./solanaceae/ngc_hs2/ngc_hs2_send.hpp
./solanaceae/ngc_hs2/ngc_hs2_send.cpp
./solanaceae/ngc_hs2/ngc_hs2_sigma.hpp
./solanaceae/ngc_hs2/ngc_hs2_sigma.cpp
#./solanaceae/ngc_hs2/ngc_hs2_recv.hpp
#./solanaceae/ngc_hs2/ngc_hs2_recv.cpp
./solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp
./solanaceae/ngc_hs2/ngc_hs2_rizzler.cpp
)
target_include_directories(solanaceae_ngchs2 PUBLIC .)
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
@ -159,3 +176,9 @@ if (SOLANACEAE_NGCHS2_BUILD_TESTING)
endif()
########################################
if (SOLANACEAE_NGCFT1_BUILD_PLUGINS)
add_subdirectory(./plugins)
endif()

33
plugins/CMakeLists.txt Normal file
View File

@ -0,0 +1,33 @@
cmake_minimum_required(VERSION 3.9...3.24 FATAL_ERROR)
########################################
add_library(plugin_ngcft1 MODULE
./plugin_ngcft1.cpp
)
target_compile_features(plugin_ngcft1 PUBLIC cxx_std_17)
set_target_properties(plugin_ngcft1 PROPERTIES
C_VISIBILITY_PRESET hidden
)
target_compile_definitions(plugin_ngcft1 PUBLIC ENTT_API_IMPORT)
target_link_libraries(plugin_ngcft1 PUBLIC
solanaceae_plugin
solanaceae_ngcext
solanaceae_ngcft1
solanaceae_sha1_ngcft1
)
########################################
add_library(plugin_ngchs2 MODULE
./plugin_ngchs2.cpp
)
target_compile_features(plugin_ngchs2 PUBLIC cxx_std_17)
set_target_properties(plugin_ngchs2 PROPERTIES
C_VISIBILITY_PRESET hidden
)
target_compile_definitions(plugin_ngchs2 PUBLIC ENTT_API_IMPORT)
target_link_libraries(plugin_ngchs2 PUBLIC
solanaceae_plugin
solanaceae_ngchs2
)

83
plugins/plugin_ngcft1.cpp Normal file
View File

@ -0,0 +1,83 @@
#include <solanaceae/plugin/solana_plugin_v1.h>
#include <solanaceae/contact/contact_store_i.hpp>
#include <solanaceae/ngc_ext/ngcext.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp>
#include <entt/entt.hpp>
#include <entt/fwd.hpp>
#include <memory>
#include <iostream>
static std::unique_ptr<NGCEXTEventProvider> g_ngcextep = nullptr;
// TODO: make sep plug
static std::unique_ptr<NGCFT1> g_ngcft1 = nullptr;
static std::unique_ptr<SHA1_NGCFT1> g_sha1_ngcft1 = nullptr;
constexpr const char* plugin_name = "NGCEXT";
extern "C" {
SOLANA_PLUGIN_EXPORT const char* solana_plugin_get_name(void) {
return plugin_name;
}
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_get_version(void) {
return SOLANA_PLUGIN_VERSION;
}
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api) {
std::cout << "PLUGIN " << plugin_name << " START()\n";
if (solana_api == nullptr) {
return 1;
}
try {
auto* os = PLUG_RESOLVE_INSTANCE(ObjectStore2);
auto* tox_i = PLUG_RESOLVE_INSTANCE(ToxI);
auto* tox_event_provider_i = PLUG_RESOLVE_INSTANCE(ToxEventProviderI);
auto* cs = PLUG_RESOLVE_INSTANCE(ContactStore4I);
auto* rmm = PLUG_RESOLVE_INSTANCE(RegistryMessageModelI);
auto* tcm = PLUG_RESOLVE_INSTANCE(ToxContactModel2);
// static store, could be anywhere tho
// construct with fetched dependencies
g_ngcextep = std::make_unique<NGCEXTEventProvider>(*tox_i, *tox_event_provider_i);
g_ngcft1 = std::make_unique<NGCFT1>(*tox_i, *tox_event_provider_i, *g_ngcextep.get());
g_sha1_ngcft1 = std::make_unique<SHA1_NGCFT1>(*os, *cs, *rmm, *g_ngcft1.get(), *tcm, *tox_event_provider_i, *g_ngcextep.get());
// register types
PLUG_PROVIDE_INSTANCE(NGCEXTEventProviderI, plugin_name, g_ngcextep.get());
PLUG_PROVIDE_INSTANCE(NGCFT1EventProviderI, plugin_name, g_ngcft1.get());
PLUG_PROVIDE_INSTANCE(NGCFT1, plugin_name, g_ngcft1.get());
PLUG_PROVIDE_INSTANCE(SHA1_NGCFT1, plugin_name, g_sha1_ngcft1.get());
} catch (const ResolveException& e) {
std::cerr << "PLUGIN " << plugin_name << " " << e.what << "\n";
return 2;
}
return 0;
}
SOLANA_PLUGIN_EXPORT void solana_plugin_stop(void) {
std::cout << "PLUGIN " << plugin_name << " STOP()\n";
g_sha1_ngcft1.reset();
g_ngcft1.reset();
g_ngcextep.reset();
}
SOLANA_PLUGIN_EXPORT float solana_plugin_tick(float delta) {
const float ft_interval = g_ngcft1->iterate(delta);
const float sha_interval = g_sha1_ngcft1->iterate(delta);
return std::min<float>(ft_interval, sha_interval);
}
} // extern C

80
plugins/plugin_ngchs2.cpp Normal file
View File

@ -0,0 +1,80 @@
#include <solanaceae/plugin/solana_plugin_v1.h>
#include <solanaceae/contact/contact_store_i.hpp>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp> // this hurts
#include <solanaceae/ngc_hs2/ngc_hs2_sigma.hpp>
#include <solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp>
#include <entt/entt.hpp>
#include <entt/fwd.hpp>
#include <memory>
#include <iostream>
// https://youtu.be/OwT83dN82pc
static std::unique_ptr<NGCHS2Sigma> g_ngchs2s = nullptr;
static std::unique_ptr<NGCHS2Rizzler> g_ngchs2r = nullptr;
constexpr const char* plugin_name = "NGCHS2";
extern "C" {
SOLANA_PLUGIN_EXPORT const char* solana_plugin_get_name(void) {
return plugin_name;
}
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_get_version(void) {
return SOLANA_PLUGIN_VERSION;
}
SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api) {
std::cout << "PLUGIN " << plugin_name << " START()\n";
if (solana_api == nullptr) {
return 1;
}
try {
//auto* tox_i = PLUG_RESOLVE_INSTANCE(ToxI);
auto* tox_event_provider_i = PLUG_RESOLVE_INSTANCE(ToxEventProviderI);
auto* cs = PLUG_RESOLVE_INSTANCE(ContactStore4I);
auto* rmm = PLUG_RESOLVE_INSTANCE(RegistryMessageModelI);
auto* tcm = PLUG_RESOLVE_INSTANCE(ToxContactModel2);
auto* ngcft1 = PLUG_RESOLVE_INSTANCE(NGCFT1);
auto* sha1_ngcft1 = PLUG_RESOLVE_INSTANCE(SHA1_NGCFT1);
// static store, could be anywhere tho
// construct with fetched dependencies
g_ngchs2s = std::make_unique<NGCHS2Sigma>(*cs, *rmm, *tcm, *ngcft1);
g_ngchs2r = std::make_unique<NGCHS2Rizzler>(*cs, *rmm, *tcm, *ngcft1, *tox_event_provider_i, *sha1_ngcft1);
// register types
PLUG_PROVIDE_INSTANCE(NGCHS2Sigma, plugin_name, g_ngchs2s.get());
PLUG_PROVIDE_INSTANCE(NGCHS2Rizzler, plugin_name, g_ngchs2r.get());
} catch (const ResolveException& e) {
std::cerr << "PLUGIN " << plugin_name << " " << e.what << "\n";
return 2;
}
return 0;
}
SOLANA_PLUGIN_EXPORT void solana_plugin_stop(void) {
std::cout << "PLUGIN " << plugin_name << " STOP()\n";
g_ngchs2r.reset();
g_ngchs2s.reset();
}
SOLANA_PLUGIN_EXPORT float solana_plugin_tick(float delta) {
const float sigma_interval = g_ngchs2s->iterate(delta);
const float rizzler_interval = g_ngchs2r->iterate(delta);
return std::min<float>(sigma_interval, rizzler_interval);
}
} // extern C

View File

@ -7,7 +7,9 @@
float FlowOnly::getCurrentDelay(void) const {
// below 1ms is useless
return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
//return std::clamp(_rtt_ema, 0.001f, RTT_MAX);
// the current iterate rate min is 5ms
return std::clamp(_rtt_ema, 0.005f, RTT_MAX);
}
void FlowOnly::addRTT(float new_delay) {

View File

@ -171,7 +171,7 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
}
}
void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) {
if (peer.cca) {
auto timeouts = peer.cca->getTimeouts();
std::set<CCAI::SeqIDType> timeouts_set{timeouts.cbegin(), timeouts.cend()};
@ -202,6 +202,7 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
}
}
bool recv_activity {false};
for (size_t idx = 0; idx < peer.recv_transfers.size(); idx++) {
if (!peer.recv_transfers.at(idx).has_value()) {
continue;
@ -211,20 +212,21 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
// proper switch case?
if (transfer.state == Group::Peer::RecvTransfer::State::FINISHING) {
transfer.finishing_timer -= time_delta;
if (transfer.finishing_timer <= 0.f) {
//dispatch(
// NGCFT1_Event::recv_done,
// Events::NGCFT1_recv_done{
// group_number, peer_number,
// uint8_t(idx)
// }
//);
transfer.timer -= time_delta;
if (transfer.timer <= 0.f) {
peer.recv_transfers.at(idx).reset();
}
recv_activity = true; // count as activity, not sure we need this
} else {
transfer.timer += time_delta;
if (transfer.timer < 0.5f) {
// back off when no activity
recv_activity = true;
}
}
}
return peer.active_send_transfers > 0 || recv_activity;
}
const CCAI* NGCFT1::getPeerCCA(
@ -270,37 +272,61 @@ NGCFT1::NGCFT1(
float NGCFT1::iterate(float time_delta) {
_time_since_activity += time_delta;
bool transfer_in_progress {false};
bool transfer_activity {false};
for (auto& [group_number, group] : groups) {
for (auto& [peer_number, peer] : group.peers) {
iteratePeer(time_delta, group_number, peer_number, peer);
transfer_activity = transfer_activity || iteratePeer(time_delta, group_number, peer_number, peer);
#if 0
// find any active transfer
if (!transfer_in_progress) {
if (!transfer_activity) {
for (const auto& t : peer.send_transfers) {
if (t.has_value()) {
transfer_in_progress = true;
transfer_activity = true;
#if 0
std::cout
<< "--- active send transfer "
<< group_number << ":" << peer_number
<< "(" << std::get<0>(_t.toxGroupPeerGetName(group_number, peer_number)).value_or("<unk>") << ")"
<< " fk:" << t.value().file_kind
<< " state:" << (int)t.value().state
<< " tsa:" << t.value().time_since_activity
<< "\n"
;
#endif
break;
}
}
}
if (!transfer_in_progress) {
if (!transfer_activity) {
for (const auto& t : peer.recv_transfers) {
if (t.has_value()) {
transfer_in_progress = true;
transfer_activity = true;
#if 0
std::cout
<< "--- active recv transfer "
<< group_number << ":" << peer_number
<< "(" << std::get<0>(_t.toxGroupPeerGetName(group_number, peer_number)).value_or("<unk>") << ")"
<< " fk:" << t.value().file_kind
<< " state:" << (int)t.value().state
<< " ft:" << t.value().finishing_timer
<< "\n"
;
#endif
break;
}
}
}
#endif
}
}
if (transfer_in_progress) {
if (transfer_activity) {
_time_since_activity = 0.f;
// ~15ms for up to 1mb/s
// ~5ms for up to 4mb/s
return 0.005f; // 5ms
} else if (_time_since_activity < 0.5f) {
} else if (_time_since_activity < 1.0f) {
// bc of temporality
return 0.025f;
} else {
@ -308,13 +334,12 @@ float NGCFT1::iterate(float time_delta) {
}
}
void NGCFT1::NGC_FT1_send_request_private(
bool NGCFT1::NGC_FT1_send_request_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, uint32_t file_id_size
) {
// TODO: error check
_neep.send_ft1_request(group_number, peer_number, file_kind, file_id, file_id_size);
return _neep.send_ft1_request(group_number, peer_number, file_kind, file_id, file_id_size);
}
bool NGCFT1::NGC_FT1_send_init_private(
@ -458,44 +483,6 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
//#if !NDEBUG
std::cout << "NGCFT1: got FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n";
//#endif
#if 0
bool accept = false;
dispatch(
NGCFT1_Event::recv_init,
Events::NGCFT1_recv_init{
e.group_number, e.peer_number,
static_cast<NGCFT1_file_kind>(e.file_kind),
e.file_id.data(), static_cast<uint32_t>(e.file_id.size()),
e.transfer_id,
e.file_size,
accept
}
);
if (!accept) {
std::cout << "NGCFT1: rejected init\n";
return true; // return true?
}
_neep.send_ft1_init_ack(e.group_number, e.peer_number, e.transfer_id);
std::cout << "NGCFT1: accepted init\n";
auto& peer = groups[e.group_number].peers[e.peer_number];
if (peer.recv_transfers[e.transfer_id].has_value()) {
std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << int(e.transfer_id) << ", other peer started new transfer on preexising\n";
}
peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{
e.file_kind,
e.file_id,
Group::Peer::RecvTransfer::State::INITED,
e.file_size,
0u,
{} // rsb
};
return true;
#else
// HACK: simply forward to init2 hanlder
return onEvent(Events::NGCEXT_ft1_init2{
e.group_number,
@ -506,12 +493,11 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) {
0x00, // non set
e.file_id, // sadly a copy, wont matter in the future
});
#endif
}
bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) {
//#if !NDEBUG
std::cout << "NGCFT1: got FT1_INIT_ACK mds:" << e.max_lossy_data_size << "\n";
std::cout << "NGCFT1: got FT1_INIT_ACK " << e.group_number << ":" << e.peer_number << " mds:" << e.max_lossy_data_size << "\n";
//#endif
// we now should start sending data
@ -589,6 +575,7 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
}
auto& transfer = peer.recv_transfers[e.transfer_id].value();
transfer.timer = 0.f;
// do reassembly, ignore dups
transfer.rsb.add(e.sequence_id, std::vector<uint8_t>(e.data)); // TODO: ugly explicit copy for what should just be a move
@ -627,8 +614,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
// TODO: keep around for remote timeout + delay + offset, so we can be sure all acks where received
// or implement a dedicated finished that needs to be acked
//transfer.finishing_timer = 0.75f; // TODO: we are receiving, we dont know delay
transfer.finishing_timer = FlowOnly::RTT_MAX;
//transfer.timer = 0.75f; // TODO: we are receiving, we dont know delay
transfer.timer = FlowOnly::RTT_MAX;
dispatch(
NGCFT1_Event::recv_done,

View File

@ -156,16 +156,17 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
std::vector<uint8_t> file_id;
enum class State {
INITED, //init acked, but no data received yet (might be dropped)
RECV, // receiving data
FINISHING, // got all the data, but we wait for 2*delay, since its likely there is data still arriving
INITED, // init acked, but no data received yet (might be dropped)
RECV, // receiving data
FINISHING, // got all the data, but we wait for 2*delay, since its likely there is data still arriving
} state;
uint64_t file_size {0};
uint64_t file_size_current {0};
// if state INITED or RECV, time since last activity
// if state FINISHING and it reaches 0, delete
float finishing_timer {0.f};
float timer {0.f};
// sequence id based reassembly
RecvSequenceBuffer rsb;
@ -209,7 +210,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
protected:
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
bool iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const;
@ -223,7 +224,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
float iterate(float delta);
public: // ft1 api
void NGC_FT1_send_request_private(
bool NGC_FT1_send_request_private(
uint32_t group_number, uint32_t peer_number,
uint32_t file_kind,
const uint8_t* file_id, uint32_t file_id_size

View File

@ -25,7 +25,7 @@ bool RecvSequenceBuffer::canPop(void) const {
std::vector<uint8_t> RecvSequenceBuffer::pop(void) {
assert(canPop());
auto tmp_data = entries.at(next_seq_id).data;
auto tmp_data = std::move(entries.at(next_seq_id).data);
erase(next_seq_id);
next_seq_id++;
return tmp_data;

View File

@ -22,34 +22,35 @@ namespace Backends {
struct SHA1MappedFilesystem_InfoBuilderState {
std::atomic_bool info_builder_dirty {false};
std::mutex info_builder_queue_mutex;
using InfoBuilderEntry = std::function<void(void)>;
using InfoBuilderEntry = std::function<void(float)>;
std::list<InfoBuilderEntry> info_builder_queue;
};
SHA1MappedFilesystem::SHA1MappedFilesystem(
ObjectStore2& os
) : StorageBackendI::StorageBackendI(os), _ibs(std::make_unique<SHA1MappedFilesystem_InfoBuilderState>()) {
) : _os(os), _ibs(std::make_unique<SHA1MappedFilesystem_InfoBuilderState>()) {
}
SHA1MappedFilesystem::~SHA1MappedFilesystem(void) {
}
void SHA1MappedFilesystem::tick(void) {
void SHA1MappedFilesystem::tick(float current_time) {
if (_ibs->info_builder_dirty) {
std::lock_guard l{_ibs->info_builder_queue_mutex};
_ibs->info_builder_dirty = false; // set while holding lock
for (auto& it : _ibs->info_builder_queue) {
it();
it(current_time);
}
_ibs->info_builder_queue.clear();
}
}
ObjectHandle SHA1MappedFilesystem::newObject(ByteSpan id) {
ObjectHandle SHA1MappedFilesystem::newObject(ByteSpan id, bool throw_construct) {
ObjectHandle o{_os.registry(), _os.registry().create()};
o.emplace<ObjComp::Ephemeral::Backend>(this);
o.emplace<ObjComp::Ephemeral::BackendMeta>(this);
o.emplace<ObjComp::Ephemeral::BackendFile2>(this);
o.emplace<ObjComp::ID>(std::vector<uint8_t>{id});
//o.emplace<ObjComp::Ephemeral::FilePath>(object_file_path.generic_u8string());
@ -67,11 +68,11 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
file_path_ = std::string(file_path)
]() mutable {
// 0. open and fail
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, -1);
std::unique_ptr<File2I> file_impl = construct_file2_r_mapped(file_path_);
if (!file_impl->isGood()) {
{
std::lock_guard l{ibs->info_builder_queue_mutex};
ibs->info_builder_queue.push_back([file_path_](){
ibs->info_builder_queue.push_back([file_path_](float){
// back on iterate thread
std::cerr << "SHA1MF error: failed opening file '" << file_path_ << "'!\n";
@ -119,11 +120,12 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
file_path_,
sha1_info = std::move(sha1_info),
cb = std::move(cb)
]() mutable { //
](float current_time) mutable { //
// executed on iterate thread
// reopen, cant move, since std::function needs to be copy consturctable (meh)
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, sha1_info.file_size);
std::unique_ptr<File2I> file_impl = construct_file2_r_mapped(file_path_);
//std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, sha1_info.file_size);
if (!file_impl->isGood()) {
std::cerr << "SHA1MF error: failed opening file '" << file_path_ << "'!\n";
return;
@ -195,7 +197,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
o.emplace_or_replace<ObjComp::Ephemeral::FilePath>(file_path_); // ?
}
o.emplace_or_replace<Components::FT1File2>(std::move(file_impl));
o.emplace_or_replace<Components::FT1File2>(std::move(file_impl), current_time);
if (!o.all_of<ObjComp::Ephemeral::File::TransferStats>()) {
o.emplace<ObjComp::Ephemeral::File::TransferStats>();
@ -233,15 +235,22 @@ std::unique_ptr<File2I> SHA1MappedFilesystem::file2(Object ov, FILE2_FLAGS flags
return nullptr;
}
// TODO: read-only one too
// since they are mapped, is this efficent to have multiple?
auto res = construct_file2_rw_mapped(file_path, -1);
if (!res || !res->isGood()) {
std::cerr << "SHA1MF error: failed constructing mapped file '" << file_path << "'\n";
return nullptr;
if (flags & FILE2_WRITE) {
auto res = construct_file2_rw_mapped(file_path, -1);
if (!res || !res->isGood()) {
std::cerr << "SHA1MF error: failed constructing mapped RW file '" << file_path << "'\n";
return nullptr;
}
return res;
} else { // read
auto res = construct_file2_r_mapped(file_path);
if (!res || !res->isGood()) {
std::cerr << "SHA1MF error: failed constructing mapped R file '" << file_path << "'\n";
return nullptr;
}
return res;
}
return res;
}
} // Backends

View File

@ -11,7 +11,8 @@ namespace Backends {
// fwd to hide the threading headers
struct SHA1MappedFilesystem_InfoBuilderState;
struct SHA1MappedFilesystem : public StorageBackendI {
struct SHA1MappedFilesystem : public StorageBackendIMeta, public StorageBackendIFile2 {
ObjectStore2& _os;
std::unique_ptr<SHA1MappedFilesystem_InfoBuilderState> _ibs;
SHA1MappedFilesystem(
@ -21,9 +22,9 @@ struct SHA1MappedFilesystem : public StorageBackendI {
// pull from info builder queue
// call from main thread (os thread?) often
void tick(void);
void tick(float current_time);
ObjectHandle newObject(ByteSpan id) override;
ObjectHandle newObject(ByteSpan id, bool throw_construct = true) override;
// performs async file hashing
// create message in cb

View File

@ -123,7 +123,7 @@ static constexpr size_t flowWindowToRequestCount(size_t flow_window) {
}
void ChunkPicker::updateParticipation(
Contact3Handle c,
ContactHandle4 c,
ObjectRegistry& objreg
) {
if (!c.all_of<Contact::Components::FT1Participation>()) {
@ -211,7 +211,7 @@ void ChunkPicker::updateParticipation(
}
std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
Contact3Handle c,
ContactHandle4 c,
ObjectRegistry& objreg,
const ReceivingTransfers& rt,
const size_t open_requests
@ -272,7 +272,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
ObjectHandle o {objreg, it->first};
// intersect self have with other have
if (!o.all_of<Components::RemoteHaveBitset, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
if (!o.all_of<ObjComp::F::RemoteHaveBitset, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
// rare case where no one else has anything
continue;
}
@ -284,7 +284,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
//const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
const auto& others_have = o.get<Components::RemoteHaveBitset>().others;
const auto& others_have = o.get<ObjComp::F::RemoteHaveBitset>().others;
auto other_it = others_have.find(c);
if (other_it == others_have.end()) {
// rare case where the other is participating but has nothing

View File

@ -1,6 +1,6 @@
#pragma once
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/contact/fwd.hpp>
#include <solanaceae/object_store/object_store.hpp>
#include "./components.hpp"
@ -50,7 +50,7 @@ struct ChunkPicker {
private: // TODO: properly sort
// updates participating_unfinished
void updateParticipation(
Contact3Handle c,
ContactHandle4 c,
ObjectRegistry& objreg
);
public:
@ -66,7 +66,7 @@ struct ChunkPicker {
};
// returns list of chunks to request
[[nodiscard]] std::vector<ContentChunkR> updateChunkRequests(
Contact3Handle c,
ContactHandle4 c,
ObjectRegistry& objreg,
const ReceivingTransfers& rt,
const size_t open_requests

View File

@ -1,5 +1,7 @@
#include "./chunk_picker_systems.hpp"
#include <solanaceae/contact/contact_store_i.hpp>
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
#include "./components.hpp"
@ -12,17 +14,19 @@
namespace Systems {
void chunk_picker_updates(
Contact3Registry& cr,
ContactStore4I& cs,
ObjectRegistry& os_reg,
const entt::dense_map<Contact3, size_t>& peer_open_requests,
const entt::dense_map<Contact4, size_t>& peer_open_requests,
const ReceivingTransfers& receiving_transfers,
NGCFT1& nft, // TODO: remove this somehow
const float delta
) {
std::vector<Contact3Handle> cp_to_remove;
std::vector<ContactHandle4> cp_to_remove;
auto& cr = cs.registry();
// first, update timers
cr.view<ChunkPickerTimer>().each([&cr, delta](const Contact3 cv, ChunkPickerTimer& cpt) {
cr.view<ChunkPickerTimer>().each([&cr, delta](const Contact4 cv, ChunkPickerTimer& cpt) {
cpt.timer -= delta;
if (cpt.timer <= 0.f) {
cr.emplace_or_replace<ChunkPickerUpdateTag>(cv);
@ -33,8 +37,8 @@ void chunk_picker_updates(
// now check for potentially missing cp
auto cput_view = cr.view<ChunkPickerUpdateTag>();
cput_view.each([&cr, &cp_to_remove](const Contact3 cv) {
Contact3Handle c{cr, cv};
cput_view.each([&cr, &cp_to_remove](const Contact4 cv) {
ContactHandle4 c{cr, cv};
//std::cout << "cput :)\n";
@ -52,8 +56,8 @@ void chunk_picker_updates(
});
// now update all cp that are tagged
cr.view<ChunkPicker, ChunkPickerUpdateTag>().each([&cr, &os_reg, &peer_open_requests, &receiving_transfers, &nft, &cp_to_remove](const Contact3 cv, ChunkPicker& cp) {
Contact3Handle c{cr, cv};
cr.view<ChunkPicker, ChunkPickerUpdateTag>().each([&cr, &os_reg, &peer_open_requests, &receiving_transfers, &nft, &cp_to_remove](const Contact4 cv, ChunkPicker& cp) {
ContactHandle4 c{cr, cv};
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral, Contact::Components::FT1Participation>()) {
cp_to_remove.push_back(c);

View File

@ -1,6 +1,6 @@
#pragma once
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/contact/fwd.hpp>
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
@ -10,9 +10,9 @@
namespace Systems {
void chunk_picker_updates(
Contact3Registry& cr,
ContactStore4I& cs,
ObjectRegistry& os_reg,
const entt::dense_map<Contact3, size_t>& peer_open_requests,
const entt::dense_map<Contact4, size_t>& peer_open_requests,
const ReceivingTransfers& receiving_transfers,
NGCFT1& nft, // TODO: remove this somehow
const float delta

View File

@ -49,7 +49,8 @@ void ReAnnounceTimer::reset(void) {
void ReAnnounceTimer::lower(void) {
timer *= 0.1f;
last_max *= 0.1f;
//last_max *= 0.1f; // is this a good idea?
last_max *= 0.9f; // is this a good idea?
}
void TransferStatsTally::Peer::trimSent(const float time_now) {

View File

@ -54,28 +54,23 @@ namespace Components {
// the cached file2 for faster access
// should be destroyed when no activity and recreated on demand
std::unique_ptr<File2I> file;
// set to current time on init, read, write
float last_activity_ts {0.f};
};
struct FT1ChunkSHA1Requested {
// requested chunks with a timer since last request
struct Entry {
float timer {0.f};
Contact3 c {entt::null};
Contact4 c {entt::null};
};
entt::dense_map<size_t, Entry> chunks;
};
// TODO: once announce is shipped, remove the "Suspected"
struct SuspectedParticipants {
entt::dense_set<Contact3> participants;
};
struct RemoteHaveBitset {
struct Entry {
bool have_all {false};
BitSet have;
};
entt::dense_map<Contact3, Entry> others;
entt::dense_set<Contact4> participants;
};
struct ReRequestInfoTimer {
@ -83,7 +78,7 @@ namespace Components {
};
struct AnnounceTargets {
entt::dense_set<Contact3> targets;
entt::dense_set<Contact4> targets;
};
struct ReAnnounceTimer {
@ -100,7 +95,7 @@ namespace Components {
};
struct TransferStatsSeparated {
entt::dense_map<Contact3, ObjComp::Ephemeral::File::TransferStats> stats;
entt::dense_map<Contact4, ObjComp::Ephemeral::File::TransferStats> stats;
};
// used to populate stats
@ -119,7 +114,7 @@ namespace Components {
void trimSent(const float time_now);
void trimReceived(const float time_now);
};
entt::dense_map<Contact3, Peer> tally;
entt::dense_map<Contact4, Peer> tally;
};
} // Components

View File

@ -0,0 +1,146 @@
#pragma once
#include <solanaceae/file/file2.hpp>
#include "./mio.hpp"
#include <filesystem>
#include <fstream>
#include <iostream>
#include <cstring>
#include <cassert>
struct File2RWMapped : public File2I {
mio::ummap_sink _file_map;
// TODO: add truncate support?
// TODO: rw always true?
File2RWMapped(std::string_view file_path, int64_t file_size = -1) : File2I(true, true) {
std::filesystem::path native_file_path{file_path};
if (!std::filesystem::exists(native_file_path)) {
std::ofstream(native_file_path) << '\0'; // force create the file
}
_file_size = std::filesystem::file_size(native_file_path);
if (file_size >= 0 && _file_size != file_size) {
try {
std::filesystem::resize_file(native_file_path, file_size); // ensure size, usually sparse
} catch (...) {
std::cerr << "FileRWMapped error: resizing file failed\n";
return;
}
_file_size = std::filesystem::file_size(native_file_path);
if (_file_size != file_size) {
std::cerr << "FileRWMapped error: resizing file failed (size mismatch)\n";
return;
}
}
std::error_code err;
// sink, is also read
_file_map.map(native_file_path.u8string(), 0, _file_size, err);
if (err) {
std::cerr << "FileRWMapped error: mapping file failed: " << err.message() << " (" << err << ")\n";
return;
}
}
virtual ~File2RWMapped(void) {
}
bool isGood(void) override {
return _file_map.is_mapped();
}
bool write(const ByteSpan data, int64_t pos = -1) override {
// TODO: support streaming write
if (pos < 0) {
return false;
}
if (data.empty()) {
return true; // false?
}
// file size is fix for mmaped files
if (pos+data.size > _file_size) {
return false;
}
std::memcpy(_file_map.data()+pos, data.ptr, data.size);
return true;
}
ByteSpanWithOwnership read(uint64_t size, int64_t pos = -1) override {
// TODO: support streaming read
if (pos < 0) {
assert(false && "streaming not implemented");
return ByteSpan{};
}
if (pos+size > _file_size) {
assert(false && "read past end");
return ByteSpan{};
}
// return non-owning
return ByteSpan{_file_map.data()+pos, size};
}
};
struct File2RMapped : public File2I {
mio::ummap_source _file_map;
File2RMapped(std::string_view file_path) : File2I(false, true) {
std::filesystem::path native_file_path{file_path};
if (!std::filesystem::exists(native_file_path)) {
std::cerr << "FileRMapped error: file does not exist\n";
return;
}
_file_size = std::filesystem::file_size(native_file_path);
std::error_code err;
_file_map.map(native_file_path.u8string(), err);
if (err) {
std::cerr << "FileRMapped error: mapping file failed: " << err.message() << " (" << err << ")\n";
return;
}
if (_file_size != _file_map.length()) {
std::cerr << "FileRMapped warning: file size and mapped file size mismatch.\n";
_file_size = _file_map.length();
}
}
virtual ~File2RMapped(void) {
}
bool isGood(void) override {
return _file_map.is_mapped();
}
bool write(const ByteSpan, int64_t = -1) override { return false; }
ByteSpanWithOwnership read(uint64_t size, int64_t pos = -1) override {
// TODO: support streaming read
if (pos < 0) {
assert(false && "streaming not implemented");
return ByteSpan{};
}
if (pos+size > _file_size) {
assert(false && "read past end");
return ByteSpan{};
}
// return non-owning
return ByteSpan{_file_map.data()+pos, size};
}
};

View File

@ -1,8 +1,12 @@
#include "./file_constructor.hpp"
#include "./file_rw_mapped.hpp"
#include "./file2_mapped.hpp"
std::unique_ptr<File2I> construct_file2_rw_mapped(std::string_view file_path, int64_t file_size) {
return std::make_unique<File2RWMapped>(file_path, file_size);
}
std::unique_ptr<File2I> construct_file2_r_mapped(std::string_view file_path) {
return std::make_unique<File2RMapped>(file_path);
}

View File

@ -6,4 +6,5 @@
#include <string_view>
std::unique_ptr<File2I> construct_file2_rw_mapped(std::string_view file_path, int64_t file_size = -1);
std::unique_ptr<File2I> construct_file2_r_mapped(std::string_view file_path);

View File

@ -0,0 +1,33 @@
#include "./file_inactivity_system.hpp"
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/file/file2.hpp>
#include "./components.hpp"
#include <iostream>
namespace Systems {
void file_inactivity(
ObjectRegistry& os_reg,
float current_time
) {
std::vector<Object> to_close;
size_t total {0};
os_reg.view<Components::FT1File2>().each([&os_reg, &to_close, &total, current_time](Object ov, const Components::FT1File2& ft_f) {
if (current_time - ft_f.last_activity_ts >= 30.f) {
// after 30sec of inactivity
to_close.push_back(ov);
}
total++;
});
if (!to_close.empty()) {
std::cout << "SHA1_NGCFT1: closing " << to_close.size() << " out of " << total << " open files\n";
os_reg.remove<Components::FT1File2>(to_close.cbegin(), to_close.cend());
}
}
} // Systems

View File

@ -0,0 +1,13 @@
#pragma once
#include <solanaceae/object_store/fwd.hpp>
namespace Systems {
void file_inactivity(
ObjectRegistry& os_reg,
float current_time
);
} // Systems

View File

@ -1,83 +0,0 @@
#pragma once
#include <solanaceae/file/file2.hpp>
#include "./mio.hpp"
#include <filesystem>
#include <fstream>
#include <iostream>
#include <cstring>
#include <cassert>
struct File2RWMapped : public File2I {
mio::ummap_sink _file_map;
// TODO: add truncate support?
// TODO: rw always true?
File2RWMapped(std::string_view file_path, int64_t file_size = -1) : File2I(true, true) {
std::filesystem::path native_file_path{file_path};
if (!std::filesystem::exists(native_file_path)) {
std::ofstream(native_file_path) << '\0'; // force create the file
}
_file_size = std::filesystem::file_size(native_file_path);
if (file_size >= 0 && _file_size != file_size) {
_file_size = file_size;
std::filesystem::resize_file(native_file_path, file_size); // ensure size, usually sparse
}
std::error_code err;
// sink, is also read
_file_map.map(native_file_path.u8string(), 0, _file_size, err);
if (err) {
std::cerr << "FileRWMapped error: mapping file failed " << err << "\n";
return;
}
}
virtual ~File2RWMapped(void) override {}
bool isGood(void) override {
return _file_map.is_mapped();
}
bool write(const ByteSpan data, int64_t pos = -1) override {
// TODO: support streaming write
if (pos < 0) {
return false;
}
if (data.empty()) {
return true; // false?
}
// file size is fix for mmaped files
if (pos+data.size > _file_size) {
return false;
}
std::memcpy(_file_map.data()+pos, data.ptr, data.size);
return true;
}
ByteSpanWithOwnership read(uint64_t size, int64_t pos = -1) override {
// TODO: support streaming read
if (pos < 0) {
assert(false && "streaming not implemented");
return ByteSpan{};
}
if (pos+size > _file_size) {
assert(false && "read past end");
return ByteSpan{};
}
// return non-owning
return ByteSpan{_file_map.data()+pos, size};
}
};

View File

@ -5,7 +5,7 @@
#include <iostream>
bool addParticipation(Contact3Handle c, ObjectHandle o) {
bool addParticipation(ContactHandle4 c, ObjectHandle o) {
bool was_new {false};
assert(static_cast<bool>(o));
assert(static_cast<bool>(c));
@ -25,7 +25,7 @@ bool addParticipation(Contact3Handle c, ObjectHandle o) {
return was_new;
}
void removeParticipation(Contact3Handle c, ObjectHandle o) {
void removeParticipation(ContactHandle4 c, ObjectHandle o) {
assert(static_cast<bool>(o));
assert(static_cast<bool>(c));

View File

@ -1,8 +1,8 @@
#pragma once
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/contact/fwd.hpp>
bool addParticipation(Contact3Handle c, ObjectHandle o);
void removeParticipation(Contact3Handle c, ObjectHandle o);
bool addParticipation(ContactHandle4 c, ObjectHandle o);
void removeParticipation(ContactHandle4 c, ObjectHandle o);

View File

@ -2,6 +2,7 @@
#include "./components.hpp"
#include <solanaceae/object_store/meta_components_file.hpp>
#include <solanaceae/contact/contact_store_i.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
#include <vector>
@ -11,13 +12,19 @@ namespace Systems {
void re_announce(
ObjectRegistry& os_reg,
Contact3Registry& cr,
ContactStore4I& cs,
NGCEXTEventProvider& neep,
const float delta
) {
std::vector<Object> to_remove;
os_reg.view<Components::ReAnnounceTimer>().each([&os_reg, &cr, &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) {
os_reg.view<Components::ReAnnounceTimer>().each([&os_reg, &cr = cs.registry(), &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) {
ObjectHandle o{os_reg, ov};
// if no known targets, or no hash, remove
if (!o.all_of<Components::AnnounceTargets, Components::FT1InfoSHA1Hash>()) {
to_remove.push_back(ov);
return;
}
// TODO: pause
//// if paused -> remove
//if (o.all_of<Message::Components::Transfer::TagPaused>()) {
@ -25,10 +32,11 @@ void re_announce(
// return;
//}
// if not downloading or info incomplete -> remove
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
// // if not downloading or info incomplete -> remove
//if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
// if not downloading AND info complete -> remove
if (!o.all_of<Components::FT1ChunkSHA1Cache>() && o.all_of<Components::FT1InfoSHA1Data>()) {
to_remove.push_back(ov);
assert(false && "transfer in broken state");
return;
}

View File

@ -1,14 +1,14 @@
#pragma once
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/contact/fwd.hpp>
#include <solanaceae/ngc_ext/ngcext.hpp>
namespace Systems {
void re_announce(
ObjectRegistry& os_reg,
Contact3Registry& cr,
ContactStore4I& cs,
NGCEXTEventProvider& neep,
const float delta
);

View File

@ -37,7 +37,7 @@ SendingTransfers::Entry& SendingTransfers::emplaceInfo(uint32_t group_number, ui
}
SendingTransfers::Entry& SendingTransfers::emplaceChunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Chunk& chunk) {
assert(!containsPeerChunk(group_number, peer_number, chunk.content, chunk.chunk_index));
assert(!containsPeerChunk(group_number, peer_number, chunk.o, chunk.chunk_index));
auto& ent = _data[combine_ids(group_number, peer_number)][transfer_id];
ent.v = chunk;
return ent;
@ -60,7 +60,7 @@ bool SendingTransfers::containsChunk(ObjectHandle o, size_t chunk_idx) const {
}
const auto& c = v.getChunk();
if (c.content != o) {
if (c.o != o) {
continue;
}
@ -85,7 +85,7 @@ bool SendingTransfers::containsPeerChunk(uint32_t group_number, uint32_t peer_nu
}
const auto& c = v.getChunk();
if (c.content != o) {
if (c.o != o) {
continue;
}

View File

@ -19,7 +19,7 @@ struct SendingTransfers {
};
struct Chunk {
ObjectHandle content;
ObjectHandle o;
size_t chunk_index; // <.< remove offset_into_file
//uint64_t offset_into_file;
// or data?

View File

@ -1,6 +1,9 @@
#include "./sha1_ngcft1.hpp"
#include <solanaceae/util/utils.hpp>
#include <solanaceae/util/time.hpp>
#include <solanaceae/contact/contact_store_i.hpp>
#include <solanaceae/contact/components.hpp>
#include <solanaceae/tox_contacts/components.hpp>
@ -27,6 +30,7 @@
#include "./re_announce_systems.hpp"
#include "./chunk_picker_systems.hpp"
#include "./transfer_stats_systems.hpp"
#include "./file_inactivity_system.hpp"
#include <iostream>
#include <filesystem>
@ -69,6 +73,12 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
assert(o.all_of<Components::Messages>());
for (auto msg : o.get<Components::Messages>().messages) {
// FIXME: hs might create and destory messages for objects without events
// we should really do garbage collection
if (!msg) {
continue;
}
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
// messages no long hold this info
@ -82,66 +92,68 @@ void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) {
// get a list of peers we can request this file from
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
const auto& cr = _cs.registry();
// 1 in 20 chance to ask random peer instead
// also works well for empty SuspectedParticipants
if ((_rng()%20) == 0) {
tox_peers.clear();
// or messages? should be the same
if (!ce.all_of<Components::AnnounceTargets>()) {
// rip
std::cerr << "SHA1_NGCFT1 warning: tried random, but no AnnounceTargets\n";
return std::nullopt;
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
// 1 in 40 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
// TODO: remove once pc1_announce is shipped
if (tox_peers.empty() || (_rng()%40) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
const uint32_t group_number = tox_peers.back().first;
auto gch = _tcm.getContactGroup(group_number);
assert(static_cast<bool>(gch));
std::vector<uint32_t> un_tox_peers;
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
for (const auto& target : ce.get<Components::AnnounceTargets>().targets) {
for (const auto child : cr.get<Contact::Components::ParentOf>(target).subs) {
if (const auto* cs = cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
if (_cr.all_of<Contact::Components::TagSelfStrong>(child)) {
if (cr.all_of<Contact::Components::TagSelfStrong>(child)) {
continue; // skip self
}
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
un_tox_peers.push_back(tgpe.peer_number);
if (cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
if (un_tox_peers.empty()) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng()%un_tox_peers.size();
const auto peer_number = un_tox_peers.at(sample_i);
}
std::cout << "SHA1_NGCFT1: doing random peer select over " << tox_peers.size() << " peers\n";
} else if (ce.all_of<Components::SuspectedParticipants>()) {
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
continue;
}
return std::make_pair(group_number, peer_number);
if (cr.all_of<Contact::Components::TagSelfStrong>(c)) {
// FIXME: how did we select ourselfs to be a suspected participant
continue;
}
if (cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
}
}
} else {
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
}
return std::nullopt;
if (tox_peers.empty()) {
return std::nullopt;
}
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
}
void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) {
void SHA1_NGCFT1::queueBitsetSendFull(ContactHandle4 c, ObjectHandle o) {
if (!static_cast<bool>(c) || !static_cast<bool>(o)) {
assert(false);
return;
@ -161,39 +173,44 @@ File2I* SHA1_NGCFT1::objGetFile2Write(ObjectHandle o) {
auto* file2_comp_ptr = o.try_get<Components::FT1File2>();
if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_write || !file2_comp_ptr->file->isGood()) {
// (re)request file2 from backend
auto new_file = _mfb.file2(o, StorageBackendI::FILE2_WRITE);
auto new_file = _mfb.file2(o, StorageBackendIFile2::FILE2_WRITE);
if (!new_file || !new_file->can_write || !new_file->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed to open object for writing\n";
return nullptr; // early out
}
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file));
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file), getTimeNow());
}
assert(file2_comp_ptr != nullptr);
assert(static_cast<bool>(file2_comp_ptr->file));
file2_comp_ptr->last_activity_ts = getTimeNow();
return file2_comp_ptr->file.get();
}
File2I* SHA1_NGCFT1::objGetFile2Read(ObjectHandle o) {
auto* file2_comp_ptr = o.try_get<Components::FT1File2>();
if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_read || !file2_comp_ptr->file->isGood()) {
std::cout << "SHA1_NGCFT1: (re)opening object " << entt::to_integral(entt::to_entity(o.entity())) << " for reading\n";
// (re)request file2 from backend
auto new_file = _mfb.file2(o, StorageBackendI::FILE2_READ);
auto new_file = _mfb.file2(o, StorageBackendIFile2::FILE2_READ);
if (!new_file || !new_file->can_read || !new_file->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed to open object for reading\n";
return nullptr; // early out
}
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file));
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file), getTimeNow());
}
assert(file2_comp_ptr != nullptr);
assert(static_cast<bool>(file2_comp_ptr->file));
file2_comp_ptr->last_activity_ts = getTimeNow();
return file2_comp_ptr->file.get();
}
SHA1_NGCFT1::SHA1_NGCFT1(
ObjectStore2& os,
Contact3Registry& cr,
ContactStore4I& cs,
RegistryMessageModelI& rmm,
NGCFT1& nft,
ToxContactModel2& tcm,
@ -202,7 +219,7 @@ SHA1_NGCFT1::SHA1_NGCFT1(
) :
_os(os),
_os_sr(_os.newSubRef(this)),
_cr(cr),
_cs(cs),
_rmm(rmm),
_rmm_sr(_rmm.newSubRef(this)),
_nft(nft),
@ -247,8 +264,7 @@ SHA1_NGCFT1::SHA1_NGCFT1(
}
float SHA1_NGCFT1::iterate(float delta) {
//std::cerr << "---------- new tick ----------\n";
_mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
_mfb.tick(getTimeNow()); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
_peer_open_requests.clear();
@ -293,7 +309,6 @@ float SHA1_NGCFT1::iterate(float delta) {
assert(!o.any_of<ObjComp::F::TagLocalHaveAll>());
_queue_content_want_info.push_back(o);
//_os.registry().remove<Components::ReRequestInfoTimer>(e);
o.remove<Components::ReRequestInfoTimer>();
// TODO: throw update?
}
@ -315,7 +330,7 @@ float SHA1_NGCFT1::iterate(float delta) {
}
}
Systems::re_announce(_os.registry(), _cr, _neep, delta);
Systems::re_announce(_os.registry(), _cs, _neep, delta);
{ // send out bitsets
// currently 1 per tick
@ -420,6 +435,8 @@ float SHA1_NGCFT1::iterate(float delta) {
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>());
assert(ce.all_of<Components::FT1InfoSHA1Hash>());
//std::cout << "SHA1_NGCFT1: trying to request info\n";
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
@ -434,10 +451,12 @@ float SHA1_NGCFT1::iterate(float delta) {
);
ce.emplace<Components::ReRequestInfoTimer>(0.f);
_queue_content_want_info.pop_front();
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
} else {
_queue_content_want_info.push_back(ce);
}
_queue_content_want_info.pop_front();
}
}
@ -445,7 +464,7 @@ float SHA1_NGCFT1::iterate(float delta) {
// new chunk picker code
// TODO: need to either split up or remove some things here
Systems::chunk_picker_updates(
_cr,
_cs,
_os.registry(),
_peer_open_requests,
_receiving_transfers,
@ -453,20 +472,26 @@ float SHA1_NGCFT1::iterate(float delta) {
delta
);
_file_inactivity_timer += delta;
if (_file_inactivity_timer >= 21.554f) {
_file_inactivity_timer = 0.f;
Systems::file_inactivity(_os.registry(), getTimeNow());
}
// transfer statistics systems
Systems::transfer_tally_update(_os.registry(), getTimeNow());
if (_peer_open_requests.empty()) {
return 2.f;
} else {
// pretty conservative and should be ajusted on a per peer, per delay basis
// seems to do the trick
return 0.05f;
// ft1 will go lower for us, if we have unresolved info,
// we dont want to be stuck in a high tickrate
return 0.5f;
}
}
// gets called back on main thread after a "new" file info got built on a different thread
void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts) {
void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact4 c, uint64_t ts) {
// sanity
if (!o.all_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Hash>()) {
assert(false);
@ -495,7 +520,7 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
// something happend, update all chunk pickers
if (o.all_of<Components::SuspectedParticipants>()) {
for (const auto& pcv : o.get<Components::SuspectedParticipants>().participants) {
Contact3Handle pch{_cr, pcv};
ContactHandle4 pch = _cs.contactHandle(pcv);
assert(static_cast<bool>(pch));
pch.emplace_or_replace<ChunkPickerUpdateTag>();
}
@ -504,9 +529,11 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
// in both cases, private and public, c (contact to) is the target
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c);
const auto& cr = _cs.registry();
// create message
const auto c_self = _cr.get<Contact::Components::Self>(c).self;
if (!_cr.valid(c_self)) {
const auto c_self = cr.get<Contact::Components::Self>(c).self;
if (!cr.valid(c_self)) {
std::cerr << "SHA1_NGCFT1 error: failed to get self!\n";
return;
}
@ -527,8 +554,8 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
// file id would be sha1_info hash or something
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
if (_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
const uint32_t group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
if (cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
const uint32_t group_number = cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
uint32_t message_id = 0;
// TODO: check return
@ -536,7 +563,7 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
} else if (
// non online group
_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
cr.any_of<Contact::Components::ToxGroupPersistent>(c)
) {
// create msg_id
const uint32_t message_id = randombytes_random();
@ -552,7 +579,79 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
updateMessages(o); // nop // TODO: remove
}
ObjectHandle SHA1_NGCFT1::constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_kind file_kind, ByteSpan file_id) {
if (file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) {
return {};
}
// check if content exists
const std::vector<uint8_t> sha1_info_hash{file_id.cbegin(), file_id.cend()};
ObjectHandle o;
if (_info_to_content.count(sha1_info_hash)) {
o = _info_to_content.at(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has existing content\n";
} else {
// TODO: backend
o = _mfb.newObject(ByteSpan{sha1_info_hash});
_info_to_content[sha1_info_hash] = o;
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has new content\n";
}
o.get_or_emplace<Components::Messages>().messages.push_back(msg);
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
// TODO: use to_c instead?
if (const auto* from_c_comp = msg.try_get<Message::Components::ContactFrom>(); from_c_comp != nullptr && _cs.registry().valid(from_c_comp->c)) {
ContactHandle4 c = _cs.contactHandle(from_c_comp->c);
// TODO: check if public
// since public
if (c.all_of<Contact::Components::Parent>()) {
// TODO: if this is a dummy contact, should it have parent?
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
} else {
std::cerr << "SHA1_NGCFT1 warning: from contact has no parent, cant add to AnnounceTargets\n";
}
// TODO: if private, add c directly
}
// queue announce that we are participating
o.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
// TODO: queue info dl
// TODO: queue info/check if we already have info
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
bool in_info_want {false};
for (const auto it : _queue_content_want_info) {
if (it == o) {
in_info_want = true;
break;
}
}
if (!in_info_want) {
// TODO: check if already receiving
_queue_content_want_info.push_back(o);
}
} else if (o.all_of<Components::FT1InfoSHA1>()){
// remove from info want
o.remove<Components::ReRequestInfoTimer>();
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
if (it != _queue_content_want_info.cend()) {
_queue_content_want_info.erase(it);
}
}
_os.throwEventUpdate(o);
return o;
}
bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
if (_object_update_lock) {
return false;
}
if (!e.e.all_of<ObjComp::Ephemeral::File::ActionTransferAccept>()) {
return false;
}
@ -561,10 +660,17 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
// not ready to load yet, skip
return false;
}
if (e.e.all_of<Components::FT1ChunkSHA1Cache>()) {
// now we update on recv_done, which included info
std::cerr << "SHA1_NGCFT1 warning: accepted but already has FT1ChunkSHA1Cache\n";
return false;
}
_object_update_lock = true;
assert(!e.e.all_of<ObjComp::F::TagLocalHaveAll>());
assert(!e.e.all_of<Components::FT1ChunkSHA1Cache>());
assert(!e.e.all_of<Components::FT1File2>());
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
// first, open file for write(+readback)
std::string full_file_path{e.e.get<ObjComp::Ephemeral::File::ActionTransferAccept>().save_to_path};
@ -589,6 +695,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
// we failed opening that filepath, so we should offer the user the oportunity to save it differently
e.e.remove<ObjComp::Ephemeral::File::ActionTransferAccept>(); // stop
_object_update_lock = false;
return false;
}
@ -646,7 +753,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
}
}
e.e.emplace_or_replace<Components::FT1File2>(std::move(file_impl));
e.e.emplace_or_replace<Components::FT1File2>(std::move(file_impl), getTimeNow());
// queue announce that we are participating
e.e.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
@ -655,9 +762,9 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
// start requesting from all participants
if (e.e.all_of<Components::SuspectedParticipants>()) {
std::cout << "accepted ft has " << e.e.get<Components::SuspectedParticipants>().participants.size() << " sp\n";
std::cout << "SHA1_NGCFT1: accepted ft has " << e.e.get<Components::SuspectedParticipants>().participants.size() << " sp\n";
for (const auto cv : e.e.get<Components::SuspectedParticipants>().participants) {
_cr.emplace_or_replace<ChunkPickerUpdateTag>(cv);
_cs.registry().emplace_or_replace<ChunkPickerUpdateTag>(cv);
}
} else {
std::cout << "accepted ft has NO sp!\n";
@ -667,6 +774,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
updateMessages(e.e);
_object_update_lock = false;
return false; // ?
}
@ -868,7 +976,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
std::cerr << "SHA1_NGCFT1 warning: unknown transfer " << (int)e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
// not ours
//std::cerr << "SHA1_NGCFT1 warning: unknown transfer " << (int)e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
return false;
}
@ -898,7 +1007,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
}
}
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
ContactHandle4 c;
const auto tpcc_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
if (tpcc_it != _tox_peer_to_contact.cend()) {
c = tpcc_it->second;
} else {
c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (static_cast<bool>(c)) {
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c;
}
}
if (static_cast<bool>(c)) {
o.get_or_emplace<Components::TransferStatsTally>()
.tally[c]
@ -920,7 +1038,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
// not ours
//std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
return false;
}
@ -934,9 +1053,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
}
} else if (transfer.isChunk()) {
auto& chunk_transfer = transfer.getChunk();
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
const auto& info = chunk_transfer.o.get<Components::FT1InfoSHA1>();
auto* file2 = objGetFile2Read(chunk_transfer.content);
auto* file2 = objGetFile2Read(chunk_transfer.o);
if (file2 == nullptr) {
// return true?
return false; // early out
@ -955,7 +1074,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
// TODO: add event to propergate to messages
//_rmm.throwEventUpdate(transfer); // should we?
Contact3Handle c;
ContactHandle4 c;
const auto tpcc_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
if (tpcc_it != _tox_peer_to_contact.cend()) {
c = tpcc_it->second;
@ -966,7 +1085,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
}
}
if (static_cast<bool>(c)) {
chunk_transfer.content.get_or_emplace<Components::TransferStatsTally>()
chunk_transfer.o.get_or_emplace<Components::TransferStatsTally>()
.tally[c]
.recently_sent
.push_back(
@ -1033,11 +1152,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
o.emplace_or_replace<ObjComp::Ephemeral::File::TagTransferPaused>();
_os.throwEventUpdate(o);
updateMessages(o);
} else if (transfer.isChunk()) {
auto o = transfer.getChunk().content;
const auto& info = o.get<Components::FT1InfoSHA1>();
auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
auto& cc = o.get<Components::FT1ChunkSHA1Cache>(); // is this assumption save?
// HACK: only check first chunk (they *should* all be the same)
const auto chunk_index = transfer.getChunk().chunk_indices.front();
@ -1086,10 +1207,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
o.emplace_or_replace<ObjComp::F::TagLocalHaveAll>();
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
// HACK: close file2, to clear ram
// TODO: just add a lastActivity comp and close files every x minutes based on that
file2 = nullptr; // making sure we dont have a stale ptr
o.remove<Components::FT1File2>(); // will be recreated on demand
// close file, as we likely no longer needs the write access we likely had
file2 = nullptr;
o.remove<Components::FT1File2>();
break;
}
}
@ -1098,14 +1218,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
o.remove<ObjComp::F::LocalHaveBitset>(); // save space
}
const auto& cr = _cs.registry();
// queue chunk have for all participants
// HACK: send immediatly to all participants
for (const auto c_part : o.get<Components::SuspectedParticipants>().participants) {
if (!_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c_part)) {
if (!cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c_part)) {
continue;
}
const auto [part_group_number, part_peer_number] = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c_part);
const auto [part_group_number, part_peer_number] = cr.get<Contact::Components::ToxGroupPeerEphemeral>(c_part);
const auto& info_hash = o.get<Components::FT1InfoSHA1Hash>().hash;
@ -1145,6 +1267,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks.erase(it);
}
_os.throwEventUpdate(o);
updateMessages(o); // mostly for received bytes
}
@ -1161,11 +1285,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id);
if (transfer.isChunk()) {
updateMessages(transfer.getChunk().content); // mostly for sent bytes
}
// we could cheat here and assume remote has chunk now
_os.throwEventUpdate(transfer.getChunk().o);
updateMessages(transfer.getChunk().o); // mostly for sent bytes
} // ignore info transfer for now
_sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id);
return true;
}
@ -1181,7 +1309,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
return false; // return true?
}
uint64_t ts = Message::getTimeMS();
uint64_t ts = getTimeMS();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
@ -1228,70 +1356,22 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
rb.try_emplace(self_c, ts);
}
// check if content exists
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
ObjectHandle o;
if (_info_to_content.count(sha1_info_hash)) {
o = _info_to_content.at(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has existing content\n";
} else {
// TODO: backend
o = _mfb.newObject(ByteSpan{sha1_info_hash});
_info_to_content[sha1_info_hash] = o;
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has new content\n";
auto o = constructFileMessageInPlace({reg, new_msg_e}, e.file_kind, {e.file_id, e.file_id_size});
if (o) {
// the other peer just sent the file message, so it is likely they have the file
addParticipation(c, o);
}
o.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
reg_ptr->emplace<Message::Components::MessageFileObject>(new_msg_e, o);
// HACK: assume the message sender is participating. usually a safe bet.
if (addParticipation(c, o)) {
// something happend, update chunk picker
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
// HACK: assume the message sender has all
o.get_or_emplace<Components::RemoteHaveBitset>().others[c] = {true, {}};
// TODO: queue info dl
// TODO: queue info/check if we already have info
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
bool in_info_want {false};
for (const auto it : _queue_content_want_info) {
if (it == o) {
in_info_want = true;
break;
}
}
if (!in_info_want) {
// TODO: check if already receiving
_queue_content_want_info.push_back(o);
}
} else if (o.all_of<Components::FT1InfoSHA1>()){
// remove from info want
o.remove<Components::ReRequestInfoTimer>();
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
if (it != _queue_content_want_info.cend()) {
_queue_content_want_info.erase(it);
}
}
// since public
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
_os.throwEventUpdate(o);
_rmm.throwEventConstruct(reg, new_msg_e);
return true; // false?
}
bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) {
bool SHA1_NGCFT1::sendFilePath(const Contact4 c, std::string_view file_name, std::string_view file_path) {
if (
// TODO: add support of offline queuing
!_cr.all_of<Contact::Components::ToxGroupEphemeral>(c)
!_cs.registry().all_of<Contact::Components::ToxGroupEphemeral>(c)
) {
return false;
}
@ -1304,7 +1384,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
}
// get current time unix epoch utc
uint64_t ts = Message::getTimeMS();
uint64_t ts = getTimeMS();
_mfb.newFromFile(
file_name, file_path,
@ -1356,8 +1436,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
for (const auto& [_, o] : _info_to_content) {
removeParticipation(c, o);
if (o.all_of<Components::RemoteHaveBitset>()) {
o.get<Components::RemoteHaveBitset>().others.erase(c);
if (o.all_of<ObjComp::F::RemoteHaveBitset>()) {
o.get<ObjComp::F::RemoteHaveBitset>().others.erase(c);
}
}
}
@ -1409,21 +1489,22 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
//c.emplace_or_replace<ChunkPickerUpdateTag>();
}
if (!o.all_of<Components::FT1InfoSHA1>()) {
// we dont have the info yet
return true;
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) {
// init
remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
remote_have.emplace(c, ObjComp::F::RemoteHaveBitset::Entry{false, num_total_chunks});
// new have? nice
//c.emplace_or_replace<ChunkPickerUpdateTag>();
}
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
return true; // we dont care beyond this point
}
auto& remote_have_peer = remote_have.at(c);
if (remote_have_peer.have_all) {
return true; // peer somehow already had all, ignoring
@ -1444,11 +1525,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
a_valid_change = true;
}
if (a_valid_change) {
// new have? nice
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
// check for completion?
// TODO: optimize
bool test_all {true};
@ -1465,6 +1541,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
remote_have_peer.have = BitSet{};
}
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
return true; // we dont care beyond this point
}
if (a_valid_change) {
// new have? nice
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
return true;
}
@ -1494,6 +1579,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n";
return false;
}
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
// we might not know yet
addParticipation(c, o);
if (!o.all_of<Components::FT1InfoSHA1>()) {
// we dont have the info yet
return true;
}
const size_t num_total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
// +7 for byte rounding
@ -1503,22 +1599,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
return false;
}
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
assert(static_cast<bool>(c));
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
// we might not know yet
addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) {
// init
remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
remote_have.emplace(c, ObjComp::F::RemoteHaveBitset::Entry{false, num_total_chunks});
}
auto& remote_have_peer = remote_have.at(c);
if (!remote_have_peer.have_all) { // TODO: maybe unset with bitset?
BitSet event_bitset{e.chunk_bitset};
// TODO: range replace instead
remote_have_peer.have.merge(event_bitset, e.start_chunk);
// check for completion?
@ -1573,8 +1663,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) {
// we might not know yet
addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
remote_have[c] = Components::RemoteHaveBitset::Entry{true, {}};
auto& remote_have = o.get_or_emplace<ObjComp::F::RemoteHaveBitset>().others;
remote_have[c] = ObjComp::F::RemoteHaveBitset::Entry{true, {}};
// new have? nice
c.emplace_or_replace<ChunkPickerUpdateTag>();
@ -1621,15 +1711,21 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) {
// something happend, update chunk picker
// !!! this is probably too much
assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>();
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
c.emplace_or_replace<ChunkPickerUpdateTag>();
}
std::cout << "SHA1_NGCFT1: and we where interested!\n";
// we should probably send the bitset back here / add to queue (can be multiple packets)
if (o.all_of<Components::FT1ChunkSHA1Cache>() && o.get<Components::FT1ChunkSHA1Cache>().have_count > 0) {
queueBitsetSendFull(c, o);
} else if (o.all_of<Components::ReAnnounceTimer>()) {
o.get<Components::ReAnnounceTimer>().lower();
}
}
// return true instead?
return false;
}

View File

@ -3,7 +3,7 @@
// solanaceae port of sha1 fts for NGCFT1
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/contact/fwd.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
@ -24,7 +24,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
ObjectStore2& _os;
ObjectStore2::SubscriptionReference _os_sr;
// TODO: backend abstraction
Contact3Registry& _cr;
ContactStore4I& _cs;
RegistryMessageModelI& _rmm;
RegistryMessageModelI::SubscriptionReference _rmm_sr;
NGCFT1& _nft;
@ -37,6 +37,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
Backends::SHA1MappedFilesystem _mfb;
bool _object_update_lock {false};
std::minstd_rand _rng {1337*11};
using clock = std::chrono::steady_clock;
@ -65,27 +67,29 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
std::deque<ObjectHandle> _queue_content_want_info;
struct QBitsetEntry {
Contact3Handle c;
ContactHandle4 c;
ObjectHandle o;
};
std::deque<QBitsetEntry> _queue_send_bitset;
// FIXME: workaround missing contact events
// only used to remove participation on peer exit
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
// only used on peer exit (no, also used to quicken lookups)
entt::dense_map<uint64_t, ContactHandle4> _tox_peer_to_contact;
// reset every iterate; kept here as an allocation optimization
entt::dense_map<Contact3, size_t> _peer_open_requests;
entt::dense_map<Contact4, size_t> _peer_open_requests;
void updateMessages(ObjectHandle ce);
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
void queueBitsetSendFull(Contact3Handle c, ObjectHandle o);
void queueBitsetSendFull(ContactHandle4 c, ObjectHandle o);
File2I* objGetFile2Write(ObjectHandle o);
File2I* objGetFile2Read(ObjectHandle o);
float _file_inactivity_timer {0.f};
public: // TODO: config
bool _udp_only {false};
@ -95,7 +99,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
public:
SHA1_NGCFT1(
ObjectStore2& os,
Contact3Registry& cr,
ContactStore4I& cs,
RegistryMessageModelI& rmm,
NGCFT1& nft,
ToxContactModel2& tcm,
@ -105,10 +109,13 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
float iterate(float delta);
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts);
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact4 c, uint64_t ts);
// construct the file part in a partially constructed message
ObjectHandle constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_kind file_kind, ByteSpan file_id);
protected: // rmm events (actions)
bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;
bool sendFilePath(const Contact4 c, std::string_view file_name, std::string_view file_path) override;
protected: // os events (actions)
bool onEvent(const ObjectStore::Events::ObjectUpdate&) override;

View File

@ -12,7 +12,7 @@ void transfer_tally_update(ObjectRegistry& os_reg, const float time_now) {
// for each tally -> stats separated
os_reg.view<Components::TransferStatsTally>().each([&os_reg, time_now, &tally_to_remove](const auto ov, Components::TransferStatsTally& tally_comp) {
// for each peer
std::vector<Contact3> to_remove;
std::vector<Contact4> to_remove;
for (auto&& [peer_c, peer] : tally_comp.tally) {
auto& tss = os_reg.get_or_emplace<Components::TransferStatsSeparated>(ov).stats;

View File

@ -0,0 +1,534 @@
#include "./ngc_hs2_rizzler.hpp"
#include <solanaceae/contact/contact_store_i.hpp>
#include <solanaceae/contact/components.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
#include <solanaceae/message3/contact_components.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/message3/components.hpp>
#include <solanaceae/tox_messages/msg_components.hpp>
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
// TODO: move somewhere else?
#include <solanaceae/ngc_ft1_sha1/util.hpp>
#include <solanaceae/util/span.hpp>
#include <solanaceae/util/time.hpp>
#include <entt/entity/entity.hpp>
#include <nlohmann/json.hpp>
#include "./serl.hpp"
#include <cstdint>
#include <deque>
#include <cstring>
#include <iostream>
// TODO: move to own file
namespace Components {
struct RequestedChatLogs {
struct Entry {
uint64_t ts_start;
uint64_t ts_end;
//std::vector<uint8_t> fid; // ?
};
std::deque<Entry> list;
bool contains(uint64_t ts_start, uint64_t ts_end);
void addRequest(uint64_t ts_start, uint64_t ts_end);
};
struct RunningChatLogs {
struct Entry {
uint64_t ts_start;
uint64_t ts_end;
std::vector<uint8_t> data;
float last_activity {0.f};
};
// list of transfers
entt::dense_map<uint8_t, Entry> list;
};
bool RequestedChatLogs::contains(uint64_t ts_start, uint64_t ts_end) {
auto it = std::find_if(list.cbegin(), list.cend(), [ts_start, ts_end](const auto& value) {
return value.ts_start == ts_start && value.ts_end == ts_end;
});
return it != list.cend();
}
void RequestedChatLogs::addRequest(uint64_t ts_start, uint64_t ts_end) {
if (contains(ts_start, ts_end)) {
return; // pre existing
}
list.push_back(Entry{ts_start, ts_end});
}
} // Components
NGCHS2Rizzler::NGCHS2Rizzler(
ContactStore4I& cs,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft,
ToxEventProviderI& tep,
SHA1_NGCFT1& sha1_nft
) :
_cs(cs),
_rmm(rmm),
_tcm(tcm),
_nft(nft),
_nftep_sr(_nft.newSubRef(this)),
_tep_sr(tep.newSubRef(this)),
_sha1_nft(sha1_nft)
{
_nftep_sr
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_data)
.subscribe(NGCFT1_Event::recv_done)
;
_tep_sr
.subscribe(Tox_Event_Type::TOX_EVENT_GROUP_PEER_JOIN)
;
}
NGCHS2Rizzler::~NGCHS2Rizzler(void) {
}
float NGCHS2Rizzler::iterate(float delta) {
for (auto it = _request_queue.begin(); it != _request_queue.end();) {
it->second.timer += delta;
if (it->second.timer < it->second.delay) {
it++;
continue;
}
const ContactHandle4 c = _cs.contactHandle(it->first);
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
// peer nolonger online
it = _request_queue.erase(it);
continue;
}
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
// now in sec
const uint64_t ts_now = getTimeMS()/1000;
const uint64_t ts_start = ts_now;
const uint64_t ts_end = ts_now-(60*60*48);
if (sendRequest(group_number, peer_number, ts_start, ts_end)) {
// TODO: requeue
// TODO: segment
// TODO: dont request already received ranges
//// on success, requeue with longer delay (minutes)
//it->second.timer = 0.f;
//it->second.delay = _delay_next_request_min + _rng_dist(_rng)*_delay_next_request_add;
//// double the delay for overlap (9m-15m)
//// TODO: finetune
//it->second.sync_delta = uint8_t((it->second.delay/60.f)*2.f) + 1;
//std::cout << "ZOX #### requeued request in " << it->second.delay << "s\n";
auto& rcl = c.get_or_emplace<Components::RequestedChatLogs>();
rcl.addRequest(ts_start, ts_end);
} else {
// on failure, assume disconnected
}
// remove from request queue
it = _request_queue.erase(it);
}
return 1000.f;
}
bool NGCHS2Rizzler::sendRequest(
uint32_t group_number, uint32_t peer_number,
uint64_t ts_start, uint64_t ts_end
) {
std::cout << "NGCHS2Rizzler: sending request to " << group_number << ":" << peer_number << " (" << ts_start << "," << ts_end << ")\n";
// build fid
std::vector<uint8_t> fid;
fid.reserve(sizeof(uint64_t)+sizeof(uint64_t));
serlSimpleType(fid, ts_start);
serlSimpleType(fid, ts_end);
assert(fid.size() == sizeof(uint64_t)+sizeof(uint64_t));
return _nft.NGC_FT1_send_request_private(
group_number, peer_number,
(uint32_t)NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK,
fid.data(), fid.size() // fid
);
}
void NGCHS2Rizzler::handleMsgPack(ContactHandle4 sync_by_c, const std::vector<uint8_t>& data) {
assert(sync_by_c);
auto* reg_ptr = _rmm.get(sync_by_c);
if (reg_ptr == nullptr) {
std::cerr << "NGCHS2Rizzler error: group without msg reg\n";
return;
}
Message3Registry& reg = *reg_ptr;
uint64_t now_ts = getTimeMS();
std::cout << "NGCHS2Rizzler: start parsing msgpack chatlog from " << entt::to_integral(sync_by_c.entity()) << "\n";
try {
const auto j = nlohmann::json::from_msgpack(data);
if (!j.is_array()) {
std::cerr << "NGCHS2Rizzler error: chatlog not array\n";
return;
}
std::cout << "NGCHS2Rizzler: chatlog has " << j.size() << " entries\n";
for (const auto j_entry : j) {
try {
// deci seconds
uint64_t ts = j_entry.at("ts");
// TODO: check against ts range
ts *= 100; // convert to ms
const uint64_t max_future_ms = 1u*60u*1000u; // accept up to 1 minute into the future
if (ts - max_future_ms > now_ts) {
// message is too far into the future
continue;
}
const auto& j_ppk = j_entry.at("ppk");
uint32_t mid = j_entry.at("mid");
if (
!(j_entry.count("text")) &&
!(j_entry.count("fkind") && j_entry.count("fid"))
) {
std::cerr << "NGCHS2Rizzler error: msg neither contains text nor file fields\n";
continue;
}
Contact4 from_c{entt::null};
{ // from_c
std::vector<uint8_t> id;
if (j_ppk.is_binary()) {
id = j_ppk.get_binary();
} else {
j_ppk.at("bytes").get_to(id);
}
const auto parent = sync_by_c.get<Contact::Components::Parent>().parent;
from_c = _cs.getOneContactByID(parent, ByteSpan{id});
auto& cr = _cs.registry();
if (!cr.valid(from_c)) {
// create sparse contact with id only
from_c = cr.create();
cr.emplace_or_replace<Contact::Components::ID>(from_c, id);
// TODO: only if public message
cr.emplace_or_replace<Contact::Components::Parent>(from_c, parent);
_cs.throwEventConstruct(from_c);
}
}
// TODO: from_c perm check
// hard to do without numbers
Message3Handle new_real_msg{reg, reg.create()};
new_real_msg.emplace<Message::Components::Timestamp>(ts); // reactive?
new_real_msg.emplace<Message::Components::ContactFrom>(from_c);
new_real_msg.emplace<Message::Components::ContactTo>(sync_by_c.get<Contact::Components::Parent>().parent);
new_real_msg.emplace<Message::Components::ToxGroupMessageID>(mid);
if (j_entry.contains("action") && static_cast<bool>(j_entry.at("action"))) {
new_real_msg.emplace<Message::Components::TagMessageIsAction>();
}
if (j_entry.contains("text")) {
const std::string& text = j_entry.at("text");
new_real_msg.emplace<Message::Components::MessageText>(text);
#if 0
std::cout
<< "msg ts:" << ts
//<< " ppk:" << j_ppk
<< " mid:" << mid
<< " type:" << type
<< " text:" << text
<< "\n"
;
#endif
} else if (j_entry.contains("fkind") && j_entry.contains("fid")) {
uint32_t fkind = j_entry.at("fkind");
const auto& j_fid = j_entry.at("fid");
std::vector<uint8_t> fid;
if (j_fid.is_binary()) {
fid = j_fid.get_binary();
} else {
j_fid.at("bytes").get_to(fid);
}
if (fkind == (uint32_t)NGCFT1_file_kind::HASH_SHA1_INFO) {
_sha1_nft.constructFileMessageInPlace(
new_real_msg,
NGCFT1_file_kind::HASH_SHA1_INFO,
ByteSpan{fid}
);
} else {
std::cerr << "NGCHS2Rizzler error: unknown file kind " << fkind << "\n";
}
#if 0
std::cout
<< "msg ts:" << ts
//<< " ppk:" << j_ppk
<< " mid:" << mid
<< " type:" << type
<< " fkind:" << fkind
<< " fid:" << j_fid
<< "\n"
;
#endif
}
// now check against pre existing
// TODO: dont do this afterwards
Message3Handle dup_msg{};
{ // check preexisting
// get comparator from contact
const ContactHandle4 reg_c = _cs.contactHandle(reg.ctx().get<Contact4>());
if (reg_c.all_of<Contact::Components::MessageIsSame>()) {
auto& comp = reg_c.get<Contact::Components::MessageIsSame>().comp;
// walking EVERY existing message OOF
// this needs optimizing
for (const Message3 other_msg : reg.view<Message::Components::Timestamp, Message::Components::ContactFrom, Message::Components::ContactTo>()) {
if (other_msg == new_real_msg) {
continue; // skip self
}
if (comp({reg, other_msg}, new_real_msg)) {
// dup
dup_msg = {reg, other_msg};
break;
}
}
} // else, default heuristic??
}
Message3Handle new_msg = new_real_msg;
if (dup_msg) {
// we leak objects here (if file)
reg.destroy(new_msg);
new_msg = dup_msg;
}
{ // by whom
auto& synced_by = new_msg.get_or_emplace<Message::Components::SyncedBy>().ts;
// dont overwrite
synced_by.try_emplace(sync_by_c, now_ts);
}
{ // now we also know they got the message
auto& list = new_msg.get_or_emplace<Message::Components::ReceivedBy>().ts;
// dont overwrite
list.try_emplace(sync_by_c, now_ts);
}
if (new_msg == dup_msg) {
// TODO: maybe update a timestamp?
_rmm.throwEventUpdate(reg, new_msg);
} else {
// pure new msg
new_msg.emplace<Message::Components::TimestampProcessed>(now_ts);
new_msg.emplace<Message::Components::TimestampWritten>(ts);
new_msg.emplace<Message::Components::TagUnread>();
_rmm.throwEventConstruct(reg, new_msg);
}
} catch (...) {
std::cerr << "NGCHS2Rizzler error: parsing entry '" << j_entry.dump() << "'\n";
}
}
} catch (...) {
std::cerr << "NGCHS2Rizzler error: failed parsing data as msgpack\n";
}
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_init& e) {
if (e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) {
return false; // not for us
}
std::cout << "NGCHS2Rizzler: recv_init " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false; // huh?
}
if (!c.all_of<Components::RequestedChatLogs>()) {
return false;
}
// parse start end
// TODO: extract
ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
std::cerr << "NGCHS2S error: range not lange enough\n";
return true;
}
// seconds
uint64_t ts_start{0};
uint64_t ts_end{0};
// parse
try {
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
ts_start = deserlTS(ts_start_bytes);
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
ts_end = deserlTS(ts_end_bytes);
} catch (...) {
std::cerr << "NGCHS2R error: failed to parse range\n";
return true;
}
if (ts_end >= ts_start) {
std::cerr << "NGCHS2R error: end not < start\n";
return true;
}
auto& reqcl = c.get<Components::RequestedChatLogs>();
if (!reqcl.contains(ts_start, ts_end)) {
// warn?
return true;
}
auto& rnncl = c.get_or_emplace<Components::RunningChatLogs>();
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // cache
auto& transfer = rnncl.list[e.transfer_id];
transfer.data.reserve(e.file_size); // danger?
transfer.last_activity = 0.f;
transfer.ts_start = ts_start;
transfer.ts_end = ts_end;
e.accept = true;
return true;
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_data& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false;
}
if (!c.all_of<Components::RunningChatLogs>()) {
return false; // not ours
}
auto& rnncl = c.get<Components::RunningChatLogs>();
if (!rnncl.list.count(e.transfer_id)) {
return false; // not ours
}
std::cout << "NGCHS2Rizzler: recv_data " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << " " << e.data_size << "@" << e.data_offset << "\n";
auto& transfer = rnncl.list.at(e.transfer_id);
transfer.data.resize(e.data_offset+e.data_size);
std::memcpy(&transfer.data[e.data_offset], e.data, e.data_size);
transfer.last_activity = 0.f;
return true;
}
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_done& e) {
// FIXME: this does not work, tcm just delteded the relation ship
//auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
//if (!c) {
// return false;
//}
const auto c_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
if (c_it == _tox_peer_to_contact.end()) {
return false;
}
auto c = c_it->second;
if (!static_cast<bool>(c)) {
return false;
}
if (!c.all_of<Components::RunningChatLogs>()) {
return false; // not ours
}
auto& rnncl = c.get<Components::RunningChatLogs>();
if (!rnncl.list.count(e.transfer_id)) {
return false; // not ours
}
std::cout << "NGCHS2Rizzler: recv_done " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
{
auto& transfer = rnncl.list.at(e.transfer_id);
// TODO: done might mean failed, so we might be parsing bs here
// use data
// TODO: move out of packet handler
handleMsgPack(c, transfer.data);
}
rnncl.list.erase(e.transfer_id);
return true;
}
bool NGCHS2Rizzler::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
const auto group_number = tox_event_group_peer_join_get_group_number(e);
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
if (!c) {
return false;
}
if (!_request_queue.count(c)) {
_request_queue[c] = {
_delay_before_first_request_min + _rng_dist(_rng)*_delay_before_first_request_add,
0.f,
0,
};
}
return false;
}

View File

@ -0,0 +1,73 @@
#pragma once
#include <solanaceae/contact/fwd.hpp>
#include <solanaceae/toxcore/tox_event_interface.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp>
// fwd
class ToxContactModel2;
class RegistryMessageModelI;
class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
ContactStore4I& _cs;
RegistryMessageModelI& _rmm;
ToxContactModel2& _tcm;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
ToxEventProviderI::SubscriptionReference _tep_sr;
SHA1_NGCFT1& _sha1_nft;
// 5s-6s
const float _delay_before_first_request_min {5.f};
const float _delay_before_first_request_add {1.f};
std::uniform_real_distribution<float> _rng_dist {0.0f, 1.0f};
std::minstd_rand _rng;
struct RequestQueueInfo {
float delay; // const
float timer;
uint64_t sync_delta; //?
};
// request queue
// c -> delay, timer
std::map<Contact4, RequestQueueInfo> _request_queue;
// FIXME: workaround missing contact events
// only used on peer exit (no, also used to quicken lookups)
entt::dense_map<uint64_t, ContactHandle4> _tox_peer_to_contact;
public:
NGCHS2Rizzler(
ContactStore4I& cs,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft,
ToxEventProviderI& tep,
SHA1_NGCFT1& sha1_nft
);
~NGCHS2Rizzler(void);
float iterate(float delta);
protected:
bool sendRequest(
uint32_t group_number, uint32_t peer_number,
uint64_t ts_start, uint64_t ts_end
);
void handleMsgPack(ContactHandle4 c, const std::vector<uint8_t>& data);
protected:
bool onEvent(const Events::NGCFT1_recv_init&) override;
bool onEvent(const Events::NGCFT1_recv_data&) override;
bool onEvent(const Events::NGCFT1_recv_done&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
};

View File

@ -1,7 +1,8 @@
#include "./ngc_hs2_send.hpp"
#include "./ngc_hs2_sigma.hpp"
#include <solanaceae/util/span.hpp>
#include <solanaceae/contact/contact_store_i.hpp>
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
#include <solanaceae/contact/components.hpp>
@ -15,37 +16,68 @@
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
#include <solanaceae/ngc_ft1_sha1/components.hpp>
// TODO: move somewhere else?
#include <solanaceae/ngc_ft1_sha1/util.hpp>
#include <nlohmann/json.hpp>
#include "./serl.hpp"
#include "./ts_find_start.hpp"
#include <iostream>
// https://www.youtube.com/watch?v=AdAqsgga3qo
// TODO: move to own file
namespace Components {
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) {
// TODO: do more than exact dedupe
for (const auto& [time_range, _] : _queue) {
if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) {
return; // already enqueued
}
}
struct IncommingTimeRangeRequestQueue {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> fid;
};
std::deque<Entry> _queue;
_queue.emplace_back(Entry{new_request, std::vector<uint8_t>{fid.cbegin(), fid.cend()}});
}
// we should remove/notadd queued requests
// that are subsets of same or larger ranges
void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid);
};
struct IncommingTimeRangeRequestRunning {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> data; // transfer data in memory
float last_activity {0.f};
};
entt::dense_map<uint8_t, Entry> _list;
};
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) {
// TODO: do more than exact dedupe
for (const auto& [time_range, _] : _queue) {
if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) {
return; // already enqueued
// TODO: what about fid?
}
}
_queue.emplace_back(Entry{
new_request,
std::vector<uint8_t>{fid.cbegin(), fid.cend()}
});
}
} // Components
NGCHS2Send::NGCHS2Send(
Contact3Registry& cr,
NGCHS2Sigma::NGCHS2Sigma(
ContactStore4I& cs,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nft
) :
_cr(cr),
_cs(cs),
_rmm(rmm),
_tcm(tcm),
_nft(nft),
@ -58,10 +90,10 @@ NGCHS2Send::NGCHS2Send(
;
}
NGCHS2Send::~NGCHS2Send(void) {
NGCHS2Sigma::~NGCHS2Sigma(void) {
}
float NGCHS2Send::iterate(float delta) {
float NGCHS2Sigma::iterate(float delta) {
// limit how often we update here (new fts usually)
if (_iterate_heat > 0.f) {
_iterate_heat -= delta;
@ -75,7 +107,12 @@ float NGCHS2Send::iterate(float delta) {
auto fn_iirq = [this](auto&& view) {
for (auto&& [cv, iirq] : view.each()) {
Contact3Handle c{_cr, cv};
if (iirq._queue.empty()) {
// TODO: remove comp?
continue;
}
ContactHandle4 c = _cs.contactHandle(cv);
auto& iirr = c.get_or_emplace<Components::IncommingTimeRangeRequestRunning>();
// dedup queued from running
@ -93,12 +130,13 @@ float NGCHS2Send::iterate(float delta) {
iirq._queue.pop_front();
continue; // how
}
const Contact3Handle group_c = {*c.registry(), c.get<Contact::Components::Parent>().parent};
const ContactHandle4 group_c = {*c.registry(), c.get<Contact::Components::Parent>().parent};
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
iirq._queue.pop_front();
continue;
}
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
_tox_peer_to_contact[combine_ids(group_number, peer_number)] = c; // cache
// TODO: check allowed range here
//_max_time_into_past_default
@ -129,15 +167,17 @@ float NGCHS2Send::iterate(float delta) {
}
};
auto& cr = _cs.registry();
// first handle range requests on weak self
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue, Contact::Components::TagSelfWeak>());
fn_iirq(cr.view<Components::IncommingTimeRangeRequestQueue, Contact::Components::TagSelfWeak>());
// we could stop here, if too much is already running
// then range on others
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
fn_iirq(cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
_cr.view<Components::IncommingTimeRangeRequestRunning>().each(
cr.view<Components::IncommingTimeRangeRequestRunning>().each(
[delta](const auto cv, Components::IncommingTimeRangeRequestRunning& irr) {
std::vector<uint8_t> to_remove;
for (auto&& [ft_id, entry] : irr._list) {
@ -147,7 +187,7 @@ float NGCHS2Send::iterate(float delta) {
}
}
for (const auto it : to_remove) {
std::cout << "NGCHS2Send warning: timed out ." << (int)it << "\n";
std::cout << "NGCHS2Sigma warning: timed out ." << (int)it << "\n";
// TODO: need a way to tell ft?
irr._list.erase(it);
// technically we are not supposed to timeout and instead rely on the done event
@ -158,30 +198,7 @@ float NGCHS2Send::iterate(float delta) {
return 1000.f;
}
template<typename Type>
static uint64_t deserlSimpleType(ByteSpan bytes) {
if (bytes.size < sizeof(Type)) {
throw int(1);
}
Type value;
for (size_t i = 0; i < sizeof(Type); i++) {
value |= Type(bytes[i]) << (i*8);
}
return value;
}
static uint32_t deserlMID(ByteSpan mid_bytes) {
return deserlSimpleType<uint32_t>(mid_bytes);
}
static uint64_t deserlTS(ByteSpan ts_bytes) {
return deserlSimpleType<uint64_t>(ts_bytes);
}
void NGCHS2Send::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
void NGCHS2Sigma::handleTimeRange(ContactHandle4 c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
@ -205,6 +222,18 @@ void NGCHS2Send::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_req
return;
}
if (ts_end >= ts_start) {
std::cerr << "NGCHS2S error: end not < start\n";
return;
}
if (!c.all_of<Contact::Components::TagSelfWeak>()) {
// make sure we dont sync past the peers first appearance
if (const auto first_seen_ptr = c.try_get<Contact::Components::FirstSeen>(); first_seen_ptr != nullptr) {
ts_start = std::max(ts_start, first_seen_ptr->ts);
}
}
// dedupe insert into queue
// how much overlap do we allow?
c.get_or_emplace<Components::IncommingTimeRangeRequestQueue>().queueRequest(
@ -213,50 +242,7 @@ void NGCHS2Send::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_req
);
}
#if 0
void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) {
std::cerr << "NGCHS2S error: singlemessage not lange enough\n";
return;
}
ByteSpan ppk;
uint32_t mid {0};
uint64_t ts {0}; // deciseconds
// parse
try {
// - ppk
// TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32)
ppk = {fid.ptr, 32};
// - mid
ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)};
mid = deserlMID(mid_bytes);
// - ts
ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)};
ts = deserlTS(ts_bytes);
} catch (...) {
std::cerr << "NGCHS2S error: failed to parse singlemessage\n";
return;
}
// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts
// dedupe insert into queue
c.get_or_emplace<Components::IncommingMsgRequestQueue>().queueRequest({
ppk,
mid,
ts,
});
}
#endif
std::vector<uint8_t> NGCHS2Send::buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) {
std::vector<uint8_t> NGCHS2Sigma::buildChatLogFileRange(ContactHandle4 c, uint64_t ts_start, uint64_t ts_end) {
const Message3Registry* reg_ptr = static_cast<const RegistryMessageModelI&>(_rmm).get(c);
if (reg_ptr == nullptr) {
return {};
@ -269,6 +255,13 @@ std::vector<uint8_t> NGCHS2Send::buildChatLogFileRange(Contact3Handle c, uint64_
return {};
}
std::cout << "NGCHS2Sigma: building chatlog for time range " << ts_start-ts_end << "s\n";
// convert seconds to milliseconds
// TODO: lift out?
ts_start *= 1000;
ts_end *= 1000;
//std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << " ts_end:" << ts_end << "\n";
auto ts_view = msg_reg.view<Message::Components::Timestamp>();
@ -315,12 +308,12 @@ std::vector<uint8_t> NGCHS2Send::buildChatLogFileRange(Contact3Handle c, uint64_
continue;
}
if (!_cr.valid(c_from_c.c)) {
ContactHandle4 c_from = _cs.contactHandle(c_from_c.c);
if (!static_cast<bool>(c_from)) {
continue; // ???
}
Contact3Handle c_from{_cr, c_from_c.c};
if (!c_from.all_of<Contact::Components::ToxGroupPeerPersistent>()) {
continue; // ???
}
@ -340,12 +333,11 @@ std::vector<uint8_t> NGCHS2Send::buildChatLogFileRange(Contact3Handle c, uint64_
}
j_entry["mid"] = msg_reg.get<Message::Components::ToxGroupMessageID>(e).id;
if (msg_reg.all_of<Message::Components::TagMessageIsAction>(e)) {
j_entry["action"] = true;
}
if (msg_reg.all_of<Message::Components::MessageText>(e)) {
if (msg_reg.all_of<Message::Components::TagMessageIsAction>(e)) {
j_entry["msgtype"] = "action"; // TODO: textaction?
} else {
j_entry["msgtype"] = "text";
}
j_entry["text"] = msg_reg.get<Message::Components::MessageText>(e).text;
} else if (msg_reg.any_of<Message::Components::MessageFileObject>(e)) {
const auto& o = msg_reg.get<Message::Components::MessageFileObject>(e).o;
@ -365,22 +357,24 @@ std::vector<uint8_t> NGCHS2Send::buildChatLogFileRange(Contact3Handle c, uint64_
j_array.push_back(j_entry);
}
std::cout << "NGCHS2Sigma: built chat log with " << j_array.size() << " entries\n";
return nlohmann::json::to_msgpack(j_array);
}
bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) {
bool NGCHS2Sigma::onEvent(const Message::Events::MessageConstruct&) {
return false;
}
bool NGCHS2Send::onEvent(const Message::Events::MessageUpdated&) {
bool NGCHS2Sigma::onEvent(const Message::Events::MessageUpdated&) {
return false;
}
bool NGCHS2Send::onEvent(const Message::Events::MessageDestory&) {
bool NGCHS2Sigma::onEvent(const Message::Events::MessageDestory&) {
return false;
}
bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) {
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_recv_request& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK
) {
@ -415,7 +409,7 @@ bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) {
return true;
}
bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data& e) {
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_data& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false;
@ -432,7 +426,7 @@ bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data& e) {
auto& transfer = irr._list.at(e.transfer_id);
if (transfer.data.size() < e.data_offset+e.data_size) {
std::cerr << "NGCHS2Send error: ft send data larger then file???\n";
std::cerr << "NGCHS2Sigma error: ft send data larger then file???\n";
assert(false && "how");
}
std::memcpy(e.data, transfer.data.data()+e.data_offset, e.data_size);
@ -441,9 +435,19 @@ bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data& e) {
return true;
}
bool NGCHS2Send::onEvent(const Events::NGCFT1_send_done& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
bool NGCHS2Sigma::onEvent(const Events::NGCFT1_send_done& e) {
// TODO: this will return null if the peer just disconnected
// FIXME: this does not work, tcm just delteded the relation ship
//auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
//if (!c) {
// return false;
//}
const auto c_it = _tox_peer_to_contact.find(combine_ids(e.group_number, e.peer_number));
if (c_it == _tox_peer_to_contact.end()) {
return false;
}
auto c = c_it->second;
if (!static_cast<bool>(c)) {
return false;
}
@ -459,7 +463,7 @@ bool NGCHS2Send::onEvent(const Events::NGCFT1_send_done& e) {
irr._list.erase(e.transfer_id);
// TODO: check if we completed it
std::cout << "NGCHS2Send: sent chatlog to " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
std::cout << "NGCHS2Sigma: sent chatlog to " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
return true;
}

View File

@ -2,7 +2,7 @@
#include <solanaceae/toxcore/tox_event_interface.hpp>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/contact/fwd.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
@ -23,32 +23,8 @@ struct TimeRangeRequest {
uint64_t ts_end{0};
};
// TODO: move to own file
namespace Components {
struct IncommingTimeRangeRequestQueue {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> fid;
};
std::deque<Entry> _queue;
// we should remove/notadd queued requests
// that are subsets of same or larger ranges
void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid);
};
struct IncommingTimeRangeRequestRunning {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> data; // transfer data in memory
float last_activity {0.f};
};
entt::dense_map<uint8_t, Entry> _list;
};
} // Components
class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
Contact3Registry& _cr;
class NGCHS2Sigma : public RegistryMessageModelEventI, public NGCFT1EventI {
ContactStore4I& _cs;
RegistryMessageModelI& _rmm;
ToxContactModel2& _tcm;
NGCFT1& _nft;
@ -71,23 +47,27 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
constexpr static bool _only_send_self_observed {true};
constexpr static int64_t _max_time_into_past_default {60*15}; // s
// FIXME: workaround missing contact events
// only used on peer exit (no, also used to quicken lookups)
entt::dense_map<uint64_t, ContactHandle4> _tox_peer_to_contact;
public:
NGCHS2Send(
Contact3Registry& cr,
NGCHS2Sigma(
ContactStore4I& cs,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
NGCFT1& nf
NGCFT1& nft
);
~NGCHS2Send(void);
~NGCHS2Sigma(void);
float iterate(float delta);
void handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request&);
void handleTimeRange(ContactHandle4 c, const Events::NGCFT1_recv_request&);
// msg reg contact
// time ranges
[[nodiscard]] std::vector<uint8_t> buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end);
[[nodiscard]] std::vector<uint8_t> buildChatLogFileRange(ContactHandle4 c, uint64_t ts_start, uint64_t ts_end);
protected:
bool onEvent(const Message::Events::MessageConstruct&) override;

View File

@ -0,0 +1,32 @@
#pragma once
#include <solanaceae/util/span.hpp>
#include <cstdint>
template<typename Type>
static uint64_t deserlSimpleType(ByteSpan bytes) {
if (bytes.size < sizeof(Type)) {
throw int(1);
}
Type value{};
for (size_t i = 0; i < sizeof(Type); i++) {
value |= Type(bytes[i]) << (i*8);
}
return value;
}
static uint64_t deserlTS(ByteSpan ts_bytes) {
return deserlSimpleType<uint64_t>(ts_bytes);
}
template<typename Type>
static void serlSimpleType(std::vector<uint8_t>& bytes, const Type& value) {
for (size_t i = 0; i < sizeof(Type); i++) {
bytes.push_back(uint8_t(value >> (i*8) & 0xff));
}
}

View File

@ -91,8 +91,9 @@ Msgpack array of messages.
- ts | 64bit deciseconds
- ppk | 32bytes
- mid | 16bit
- msgtype | enum (string or number?)
- if text/action |
- if action |
- action | bool
- if text |
- text | string | maybe byte array instead?
- if file |
- fkind | 32bit enum | is this right?
@ -102,9 +103,6 @@ Msgpack array of messages.
Name is the actual string key.
Data type sizes are suggestions, if not defined by the tox protocol.
How unknown `msgtype`s are handled is client defined.
They can be fully ignored or displayed as broken.
## TODO
- [ ] figure out a pro-active approach (instead of waiting for a range request)

View File

@ -8,7 +8,7 @@
// perform binary search to find the first message not newer than ts_start
template<typename View>
auto find_start_by_ts(const View& view, uint64_t ts_start) {
std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << "\n";
//std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << "\n";
// -> first value smaller than start ts
auto res = std::lower_bound(
@ -22,9 +22,9 @@ auto find_start_by_ts(const View& view, uint64_t ts_start) {
if (res != view.end()) {
const auto& [ts_comp] = view.get(*res);
std::cout << "!!!! first value not newer than start ts is " << ts_comp.ts << "\n";
//std::cout << "!!!! first value not newer than start ts is " << ts_comp.ts << "\n";
} else {
std::cout << "!!!! no first value not newer than start ts\n";
//std::cout << "!!!! no first value not newer than start ts\n";
}
return res;
}