Compare commits
7 Commits
ignore_tim
...
34dc01d4dc
Author | SHA1 | Date | |
---|---|---|---|
34dc01d4dc | |||
1bf1fbce75 | |||
37b92f67c8 | |||
01c892df8c | |||
6eb5826616 | |||
2e6b15e4ad | |||
63de78aaeb |
@ -44,8 +44,10 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC
|
||||
########################################
|
||||
|
||||
add_library(solanaceae_ngchs2
|
||||
./solanaceae/ngc_hs2/ngc_hs2.hpp
|
||||
./solanaceae/ngc_hs2/ngc_hs2.cpp
|
||||
./solanaceae/ngc_hs2/ngc_hs2_send.hpp
|
||||
./solanaceae/ngc_hs2/ngc_hs2_send.cpp
|
||||
./solanaceae/ngc_hs2/ngc_hs2_recv.hpp
|
||||
./solanaceae/ngc_hs2/ngc_hs2_recv.cpp
|
||||
)
|
||||
target_include_directories(solanaceae_ngchs2 PUBLIC .)
|
||||
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)
|
||||
|
@ -12,7 +12,7 @@ struct FlowOnly : public CCAI {
|
||||
public: // config
|
||||
static constexpr float RTT_EMA_ALPHA = 0.001f; // might need change over time
|
||||
static constexpr float RTT_UP_MAX = 3.0f; // how much larger a delay can be to be taken into account
|
||||
static constexpr float RTT_MAX = 2.f; // 2 sec is probably too much
|
||||
static constexpr float RTT_MAX = 2.f; // maybe larger for tunneled connections
|
||||
|
||||
protected:
|
||||
// initialize to low value, will get corrected very fast
|
||||
|
@ -625,7 +625,8 @@ bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) {
|
||||
|
||||
// TODO: keep around for remote timeout + delay + offset, so we can be sure all acks where received
|
||||
// or implement a dedicated finished that needs to be acked
|
||||
transfer.finishing_timer = 0.75f; // TODO: we are receiving, we dont know delay
|
||||
//transfer.finishing_timer = 0.75f; // TODO: we are receiving, we dont know delay
|
||||
transfer.finishing_timer = FlowOnly::RTT_MAX;
|
||||
|
||||
dispatch(
|
||||
NGCFT1_Event::recv_done,
|
||||
|
@ -133,6 +133,7 @@ void ChunkPicker::updateParticipation(
|
||||
|
||||
entt::dense_set<Object> checked;
|
||||
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
|
||||
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
|
||||
const ObjectHandle o {objreg, ov};
|
||||
|
||||
if (participating_unfinished.contains(o)) {
|
||||
@ -150,6 +151,21 @@ void ChunkPicker::updateParticipation(
|
||||
participating_unfinished.erase(o);
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: optimize this to only change on dirty, or something
|
||||
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
|
||||
Priority prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
|
||||
|
||||
uint16_t pskips =
|
||||
prio == Priority::HIGHEST ? 0u :
|
||||
prio == Priority::HIGH ? 1u :
|
||||
prio == Priority::NORMAL ? 2u :
|
||||
prio == Priority::LOW ? 4u :
|
||||
8u // LOWEST
|
||||
;
|
||||
|
||||
participating_unfinished.at(o).should_skip = pskips;
|
||||
}
|
||||
} else {
|
||||
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
|
||||
continue;
|
||||
@ -160,8 +176,6 @@ void ChunkPicker::updateParticipation(
|
||||
}
|
||||
|
||||
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
|
||||
//using Priority = Components::DownloadPriority::Priority;
|
||||
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
|
||||
Priority prio = Priority::NORMAL;
|
||||
|
||||
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
|
||||
@ -239,13 +253,12 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||
// round robin content (remember last obj)
|
||||
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
|
||||
participating_in_last = participating_unfinished.begin()->first;
|
||||
//participating_in_last = *participating_unfinished.begin();
|
||||
}
|
||||
assert(objreg.valid(participating_in_last));
|
||||
|
||||
auto it = participating_unfinished.find(participating_in_last);
|
||||
// hard limit robin rounds to array size time 20
|
||||
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++) {
|
||||
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) {
|
||||
if (it == participating_unfinished.end()) {
|
||||
it = participating_unfinished.begin();
|
||||
}
|
||||
@ -254,6 +267,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||
it->second.skips++;
|
||||
continue;
|
||||
}
|
||||
it->second.skips = 0;
|
||||
|
||||
ObjectHandle o {objreg, it->first};
|
||||
|
||||
@ -361,7 +375,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||
}
|
||||
}
|
||||
|
||||
if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
|
||||
//if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
|
||||
if (it == participating_unfinished.end()) {
|
||||
participating_in_last = entt::null;
|
||||
} else {
|
||||
participating_in_last = it->first;
|
||||
|
@ -1165,6 +1165,13 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: make perms go though contacts
|
||||
// TODO: consider temporal component? not here, here is now
|
||||
if (!_tcm.groupPeerCanSpeak(e.group_number, e.peer_number)) {
|
||||
// peer has not the permission to speak, discard
|
||||
return false; // return true?
|
||||
}
|
||||
|
||||
uint64_t ts = Message::getTimeMS();
|
||||
|
||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
|
@ -1,44 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
//#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
|
||||
//#include <solanaceae/message3/registry_message_model.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
|
||||
// fwd
|
||||
class ToxContactModel2;
|
||||
|
||||
class NGCHS2 : public ToxEventI, public NGCFT1EventI {
|
||||
ToxContactModel2& _tcm;
|
||||
//Contact3Registry& _cr;
|
||||
//RegistryMessageModelI& _rmm;
|
||||
ToxEventProviderI::SubscriptionReference _tep_sr;
|
||||
NGCFT1& _nft;
|
||||
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
||||
|
||||
public:
|
||||
NGCHS2(
|
||||
ToxContactModel2& tcm,
|
||||
ToxEventProviderI& tep,
|
||||
NGCFT1& nf
|
||||
);
|
||||
|
||||
~NGCHS2(void);
|
||||
|
||||
float iterate(float delta);
|
||||
|
||||
protected:
|
||||
bool onEvent(const Events::NGCFT1_recv_request&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_init&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_done&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_done&) override;
|
||||
|
||||
protected:
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
|
||||
};
|
||||
|
@ -1,24 +1,34 @@
|
||||
#include "./ngc_hs2.hpp"
|
||||
#include "./ngc_hs2_recv.hpp"
|
||||
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
|
||||
NGCHS2::NGCHS2(
|
||||
NGCHS2Recv::NGCHS2Recv(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
ToxEventProviderI& tep,
|
||||
NGCFT1& nft
|
||||
) :
|
||||
_cr(cr),
|
||||
_rmm(rmm),
|
||||
_rmm_sr(_rmm.newSubRef(this)),
|
||||
_tcm(tcm),
|
||||
_tep_sr(tep.newSubRef(this)),
|
||||
_nft(nft),
|
||||
_nftep_sr(_nft.newSubRef(this))
|
||||
{
|
||||
_rmm_sr
|
||||
.subscribe(RegistryMessageModel_Event::message_construct)
|
||||
.subscribe(RegistryMessageModel_Event::message_updated)
|
||||
.subscribe(RegistryMessageModel_Event::message_destroy)
|
||||
;
|
||||
|
||||
_tep_sr
|
||||
.subscribe(TOX_EVENT_GROUP_PEER_JOIN)
|
||||
.subscribe(TOX_EVENT_GROUP_PEER_EXIT)
|
||||
;
|
||||
|
||||
_nftep_sr
|
||||
.subscribe(NGCFT1_Event::recv_init)
|
||||
.subscribe(NGCFT1_Event::recv_request)
|
||||
.subscribe(NGCFT1_Event::recv_init)
|
||||
.subscribe(NGCFT1_Event::recv_data)
|
||||
@ -28,14 +38,26 @@ NGCHS2::NGCHS2(
|
||||
;
|
||||
}
|
||||
|
||||
NGCHS2::~NGCHS2(void) {
|
||||
NGCHS2Recv::~NGCHS2Recv(void) {
|
||||
}
|
||||
|
||||
float NGCHS2::iterate(float delta) {
|
||||
float NGCHS2Recv::iterate(float delta) {
|
||||
return 1000.f;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
bool NGCHS2Recv::onEvent(const Message::Events::MessageConstruct&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Recv::onEvent(const Message::Events::MessageUpdated&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Recv::onEvent(const Message::Events::MessageDestory&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
if (
|
||||
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
|
||||
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
|
||||
@ -46,7 +68,7 @@ bool NGCHS2::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
if (
|
||||
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
|
||||
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
|
||||
@ -57,23 +79,23 @@ bool NGCHS2::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_recv_data&) {
|
||||
bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_data&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_send_data&) {
|
||||
bool NGCHS2Recv::onEvent(const Events::NGCFT1_send_data&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_recv_done&) {
|
||||
bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_done&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onEvent(const Events::NGCFT1_send_done&) {
|
||||
bool NGCHS2Recv::onEvent(const Events::NGCFT1_send_done&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
|
||||
bool NGCHS2Recv::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
|
||||
const auto group_number = tox_event_group_peer_join_get_group_number(e);
|
||||
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
|
||||
|
||||
@ -85,7 +107,7 @@ bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
||||
bool NGCHS2Recv::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
||||
return false;
|
||||
}
|
||||
|
83
solanaceae/ngc_hs2/ngc_hs2_recv.hpp
Normal file
83
solanaceae/ngc_hs2/ngc_hs2_recv.hpp
Normal file
@ -0,0 +1,83 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
|
||||
#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/message3/registry_message_model.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
|
||||
#include <entt/container/dense_map.hpp>
|
||||
|
||||
// fwd
|
||||
class ToxContactModel2;
|
||||
|
||||
// time ranges
|
||||
// should we just do last x minutes like zngchs?
|
||||
// properly done, we need to use:
|
||||
// - Message::Components::ViewCurserBegin
|
||||
// - Message::Components::ViewCurserEnd
|
||||
//
|
||||
// on startup, manually check all registries for ranges (meh) (do later)
|
||||
// listen on message events, check if range, see if range satisfied recently
|
||||
// deal with a queue, and delay (at least 1sec, 3-10sec after a peer con change)
|
||||
// or we always overrequest (eg 48h), and only fetch messages in, or close to range
|
||||
|
||||
class NGCHS2Recv : public RegistryMessageModelEventI, public ToxEventI, public NGCFT1EventI {
|
||||
Contact3Registry& _cr;
|
||||
RegistryMessageModelI& _rmm;
|
||||
RegistryMessageModelI::SubscriptionReference _rmm_sr;
|
||||
ToxContactModel2& _tcm;
|
||||
ToxEventProviderI::SubscriptionReference _tep_sr;
|
||||
NGCFT1& _nft;
|
||||
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
||||
|
||||
// describes our knowlage of a remote peer
|
||||
struct RemoteInfo {
|
||||
// list of all ppk+mid+ts they sent us (filtered by reqs, like range, ppk...)
|
||||
// with when it last sent a range? hmm
|
||||
};
|
||||
entt::dense_map<Contact3, RemoteInfo> _remote_info;
|
||||
|
||||
// open/running info requests (by c)
|
||||
|
||||
// open/running info responses (by c)
|
||||
|
||||
static const bool _only_send_self_observed {true};
|
||||
static const int64_t _max_time_into_past_default {60}; // s
|
||||
|
||||
public:
|
||||
NGCHS2Recv(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
ToxEventProviderI& tep,
|
||||
NGCFT1& nf
|
||||
);
|
||||
|
||||
~NGCHS2Recv(void);
|
||||
|
||||
float iterate(float delta);
|
||||
|
||||
// add to queue with timer
|
||||
// check and updates all existing cursers for giving reg in queue
|
||||
void enqueueWantCurser(Message3Handle m);
|
||||
|
||||
protected:
|
||||
bool onEvent(const Message::Events::MessageConstruct&) override;
|
||||
bool onEvent(const Message::Events::MessageUpdated&) override;
|
||||
bool onEvent(const Message::Events::MessageDestory&) override;
|
||||
|
||||
protected:
|
||||
bool onEvent(const Events::NGCFT1_recv_request&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_init&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_done&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_done&) override;
|
||||
|
||||
protected:
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
|
||||
};
|
||||
|
286
solanaceae/ngc_hs2/ngc_hs2_send.cpp
Normal file
286
solanaceae/ngc_hs2/ngc_hs2_send.cpp
Normal file
@ -0,0 +1,286 @@
|
||||
#include "./ngc_hs2_send.hpp"
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
|
||||
#include <solanaceae/contact/components.hpp>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
// https://www.youtube.com/watch?v=AdAqsgga3qo
|
||||
|
||||
namespace Components {
|
||||
|
||||
void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) {
|
||||
// TODO: do more than exact dedupe
|
||||
for (const auto& [ts_start, ts_end] : _queue) {
|
||||
if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) {
|
||||
return; // already enqueued
|
||||
}
|
||||
}
|
||||
|
||||
_queue.push_back(new_request);
|
||||
}
|
||||
|
||||
void IncommingMsgRequestQueue::queueRequest(const SingleMessageRequest& new_request) {
|
||||
for (const auto& [ppk, mid, ts] : _queue) {
|
||||
if (mid == new_request.mid && ts == new_request.ts && ppk == new_request.ppk) {
|
||||
return; // already enqueued
|
||||
}
|
||||
}
|
||||
_queue.push_back(new_request);
|
||||
}
|
||||
|
||||
} // Components
|
||||
|
||||
|
||||
NGCHS2Send::NGCHS2Send(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
NGCFT1& nft
|
||||
) :
|
||||
_cr(cr),
|
||||
_rmm(rmm),
|
||||
_tcm(tcm),
|
||||
_nft(nft),
|
||||
_nftep_sr(_nft.newSubRef(this))
|
||||
{
|
||||
_nftep_sr
|
||||
.subscribe(NGCFT1_Event::recv_request)
|
||||
//.subscribe(NGCFT1_Event::recv_init) // we only send init
|
||||
//.subscribe(NGCFT1_Event::recv_data) // we only send data
|
||||
.subscribe(NGCFT1_Event::send_data)
|
||||
//.subscribe(NGCFT1_Event::recv_done)
|
||||
.subscribe(NGCFT1_Event::send_done)
|
||||
;
|
||||
}
|
||||
|
||||
NGCHS2Send::~NGCHS2Send(void) {
|
||||
}
|
||||
|
||||
float NGCHS2Send::iterate(float delta) {
|
||||
// limit how often we update here (new fts usually)
|
||||
if (_iterate_heat > 0.f) {
|
||||
_iterate_heat -= delta;
|
||||
return 1000.f;
|
||||
} else {
|
||||
_iterate_heat = _iterate_cooldown;
|
||||
}
|
||||
|
||||
// work request queue
|
||||
// check if already running, discard
|
||||
|
||||
auto fn_iirq = [this](auto&& view) {
|
||||
for (auto&& [cv, iirq] : view.each()) {
|
||||
Contact3Handle c{_cr, cv};
|
||||
auto& iirr = c.get_or_emplace<Components::IncommingInfoRequestRunning>();
|
||||
|
||||
// dedup queued from running
|
||||
|
||||
if (iirr._list.size() >= _max_parallel_per_peer) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// new ft here?
|
||||
}
|
||||
};
|
||||
|
||||
auto fn_imrq = [this](auto&& view) {
|
||||
for (auto&& [cv, imrq] : view.each()) {
|
||||
Contact3Handle c{_cr, cv};
|
||||
auto& imrr = c.get_or_emplace<Components::IncommingMsgRequestRunning>();
|
||||
|
||||
// dedup queued from running
|
||||
|
||||
if (imrr._list.size() >= _max_parallel_per_peer) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// new ft here?
|
||||
}
|
||||
};
|
||||
|
||||
// first handle range requests on weak self
|
||||
//for (auto&& [cv, iirq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>().each()) {
|
||||
fn_iirq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>());
|
||||
|
||||
// then handle messages on weak self
|
||||
//for (auto&& [cv, imrq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>().each()) {
|
||||
fn_imrq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>());
|
||||
|
||||
// we could stop here, if too much is already running
|
||||
|
||||
// then range on others
|
||||
//for (auto&& [cv, iirq] : _cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
|
||||
fn_iirq(_cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
|
||||
|
||||
// then messages on others
|
||||
//for (auto&& [cv, imrq] : _cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
|
||||
fn_imrq(_cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
|
||||
|
||||
return 1000.f;
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
static uint64_t deserlSimpleType(ByteSpan bytes) {
|
||||
if (bytes.size < sizeof(Type)) {
|
||||
throw int(1);
|
||||
}
|
||||
|
||||
Type value;
|
||||
|
||||
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||
value |= Type(bytes[i]) << (i*8);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
static uint32_t deserlMID(ByteSpan mid_bytes) {
|
||||
return deserlSimpleType<uint32_t>(mid_bytes);
|
||||
}
|
||||
|
||||
static uint64_t deserlTS(ByteSpan ts_bytes) {
|
||||
return deserlSimpleType<uint64_t>(ts_bytes);
|
||||
}
|
||||
|
||||
void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
|
||||
ByteSpan fid{e.file_id, e.file_id_size};
|
||||
// TODO: better size check
|
||||
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
|
||||
std::cerr << "NGCHS2S error: range not lange enough\n";
|
||||
return;
|
||||
}
|
||||
|
||||
// seconds
|
||||
uint64_t ts_start{0};
|
||||
uint64_t ts_end{0};
|
||||
|
||||
// parse
|
||||
try {
|
||||
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
|
||||
ts_start = deserlTS(ts_start_bytes);
|
||||
|
||||
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
|
||||
ts_end = deserlTS(ts_end_bytes);
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2S error: failed to parse range\n";
|
||||
return;
|
||||
}
|
||||
|
||||
// dedupe insert into queue
|
||||
// how much overlap do we allow?
|
||||
c.get_or_emplace<Components::IncommingInfoRequestQueue>().queueRequest({
|
||||
ts_start,
|
||||
ts_end,
|
||||
});
|
||||
}
|
||||
|
||||
void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
|
||||
ByteSpan fid{e.file_id, e.file_id_size};
|
||||
// TODO: better size check
|
||||
if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) {
|
||||
std::cerr << "NGCHS2S error: singlemessage not lange enough\n";
|
||||
return;
|
||||
}
|
||||
|
||||
ByteSpan ppk;
|
||||
uint32_t mid {0};
|
||||
uint64_t ts {0}; // deciseconds
|
||||
|
||||
// parse
|
||||
try {
|
||||
// - ppk
|
||||
// TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32)
|
||||
ppk = {fid.ptr, 32};
|
||||
|
||||
// - mid
|
||||
ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)};
|
||||
mid = deserlMID(mid_bytes);
|
||||
|
||||
// - ts
|
||||
ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)};
|
||||
ts = deserlTS(ts_bytes);
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2S error: failed to parse singlemessage\n";
|
||||
return;
|
||||
}
|
||||
|
||||
// file content
|
||||
// - message type (text/textaction/file(ft1sha1))
|
||||
// - if text/textaction
|
||||
// - text (string)
|
||||
// - else if file
|
||||
// - file type
|
||||
// - file id
|
||||
|
||||
// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts
|
||||
|
||||
// dedupe insert into queue
|
||||
c.get_or_emplace<Components::IncommingMsgRequestQueue>().queueRequest({
|
||||
ppk,
|
||||
mid,
|
||||
ts,
|
||||
});
|
||||
}
|
||||
|
||||
bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Send::onEvent(const Message::Events::MessageUpdated&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Send::onEvent(const Message::Events::MessageDestory&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) {
|
||||
if (
|
||||
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
|
||||
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
|
||||
) {
|
||||
return false; // not for us
|
||||
}
|
||||
|
||||
// TODO: when is it done from queue?
|
||||
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (!c) {
|
||||
return false; // how
|
||||
}
|
||||
|
||||
// is other peer allowed to make requests
|
||||
//bool quick_allow {false};
|
||||
bool quick_allow {true}; // HACK: disable all restrictions for this early test
|
||||
// TODO: quick deny?
|
||||
{
|
||||
// - tagged as weakself
|
||||
if (!quick_allow && c.all_of<Contact::Components::TagSelfWeak>()) {
|
||||
quick_allow = true;
|
||||
}
|
||||
|
||||
// - sub perm level??
|
||||
// - out of max time range (ft specific, not a quick_allow)
|
||||
}
|
||||
|
||||
if (e.file_kind == NGCFT1_file_kind::HS2_INFO_RANGE_TIME) {
|
||||
handleRange(c, e);
|
||||
} else if (e.file_kind == NGCFT1_file_kind::HS2_SINGLE_MESSAGE) {
|
||||
handleSingleMessage(c, e);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool NGCHS2Send::onEvent(const Events::NGCFT1_send_done&) {
|
||||
return false;
|
||||
}
|
||||
|
116
solanaceae/ngc_hs2/ngc_hs2_send.hpp
Normal file
116
solanaceae/ngc_hs2/ngc_hs2_send.hpp
Normal file
@ -0,0 +1,116 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
|
||||
#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/message3/registry_message_model.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
|
||||
#include <entt/container/dense_map.hpp>
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <vector>
|
||||
|
||||
// fwd
|
||||
class ToxContactModel2;
|
||||
|
||||
|
||||
struct InfoRequest {
|
||||
uint64_t ts_start{0};
|
||||
uint64_t ts_end{0};
|
||||
};
|
||||
|
||||
struct SingleMessageRequest {
|
||||
ByteSpan ppk;
|
||||
uint32_t mid {0};
|
||||
uint64_t ts {0}; // deciseconds
|
||||
};
|
||||
|
||||
// TODO: move to own file
|
||||
namespace Components {
|
||||
struct IncommingInfoRequestQueue {
|
||||
std::vector<InfoRequest> _queue;
|
||||
|
||||
// we should remove/notadd queued requests
|
||||
// that are subsets of same or larger ranges
|
||||
void queueRequest(const InfoRequest& new_request);
|
||||
};
|
||||
|
||||
struct IncommingInfoRequestRunning {
|
||||
struct Entry {
|
||||
InfoRequest ir;
|
||||
std::vector<uint8_t> data; // trasfer data in memory
|
||||
};
|
||||
entt::dense_map<uint8_t, Entry> _list;
|
||||
};
|
||||
|
||||
struct IncommingMsgRequestQueue {
|
||||
// optimize dup lookups (this list could be large)
|
||||
std::vector<SingleMessageRequest> _queue;
|
||||
|
||||
// removes dups
|
||||
void queueRequest(const SingleMessageRequest& new_request);
|
||||
};
|
||||
|
||||
struct IncommingMsgRequestRunning {
|
||||
struct Entry {
|
||||
SingleMessageRequest smr;
|
||||
std::vector<uint8_t> data; // trasfer data in memory
|
||||
};
|
||||
// make more efficent? this list is very short
|
||||
entt::dense_map<uint8_t, Entry> _list;
|
||||
};
|
||||
} // Components
|
||||
|
||||
class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
Contact3Registry& _cr;
|
||||
RegistryMessageModelI& _rmm;
|
||||
ToxContactModel2& _tcm;
|
||||
NGCFT1& _nft;
|
||||
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
||||
|
||||
float _iterate_heat {0.f};
|
||||
constexpr static float _iterate_cooldown {1.22f}; // sec
|
||||
|
||||
// open/running info requests (by c)
|
||||
// comp on peer c
|
||||
|
||||
// open/running info responses (by c)
|
||||
// comp on peer c
|
||||
|
||||
// limit to 2 uploads per peer simultaniously
|
||||
// TODO: increase for prod (4?)
|
||||
// currently per type
|
||||
constexpr static size_t _max_parallel_per_peer {2};
|
||||
|
||||
constexpr static bool _only_send_self_observed {true};
|
||||
constexpr static int64_t _max_time_into_past_default {60*15}; // s
|
||||
|
||||
public:
|
||||
NGCHS2Send(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
NGCFT1& nf
|
||||
);
|
||||
|
||||
~NGCHS2Send(void);
|
||||
|
||||
float iterate(float delta);
|
||||
|
||||
void handleRange(Contact3Handle c, const Events::NGCFT1_recv_request&);
|
||||
void handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request&);
|
||||
|
||||
protected:
|
||||
bool onEvent(const Message::Events::MessageConstruct&) override;
|
||||
bool onEvent(const Message::Events::MessageUpdated&) override;
|
||||
bool onEvent(const Message::Events::MessageDestory&) override;
|
||||
|
||||
protected:
|
||||
bool onEvent(const Events::NGCFT1_recv_request&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_data&) override;
|
||||
bool onEvent(const Events::NGCFT1_send_done&) override;
|
||||
};
|
||||
|
77
solanaceae/ngc_hs2/spec_ngc_hs2.md
Normal file
77
solanaceae/ngc_hs2/spec_ngc_hs2.md
Normal file
@ -0,0 +1,77 @@
|
||||
# [NGC] Group-History-Sync (v2) [PoC] [Draft]
|
||||
|
||||
Simple group history sync that uses `peer public key` + `message_id` + `timestamp` (`ppk+mid+ts`) to, mostly, uniquely identify messages and deliver them.
|
||||
|
||||
## Requirements
|
||||
|
||||
TODO
|
||||
|
||||
### File transfers
|
||||
|
||||
For sending packs of messages. A single message can be larger than a single custom packet, so this is a must-have.
|
||||
|
||||
## Procedure
|
||||
|
||||
Peer A can request `ppk+mid+ts` list for a given time range from peer B.
|
||||
|
||||
Peer B then sends a filetransfer (with special file type) of list of `ppk+mid+ts`.
|
||||
Optionally compressed. (Delta-coding / zstd)
|
||||
|
||||
Peer A keeps doing that until the desired time span is covered.
|
||||
|
||||
After that or simultaniously, Peer A requests messages from peer B, either indivitually, or packed? in ranges?.
|
||||
Optionally compressed.
|
||||
|
||||
During all that, peer B usually does the same thing to peer A.
|
||||
|
||||
## Traffic savings
|
||||
|
||||
It is recomended to remember if a range has been requested and answered from a given peer, to reduce traffic.
|
||||
|
||||
While compression is optional, it is recommended.
|
||||
|
||||
## Message uniqueness
|
||||
|
||||
This protocol relies on the randomness of `message_id` and the clocks to be more or less synchronized.
|
||||
|
||||
However, `message_id` can be manipulated freely by any peer, this can make messages appear as duplicates.
|
||||
|
||||
This can be used here, if you don't wish your messages to be syncronized (to an extent).
|
||||
|
||||
## Security
|
||||
|
||||
Only sync publicly sent/recieved messages.
|
||||
|
||||
Only allow sync or extended time ranges from peers you trust (enough).
|
||||
|
||||
The default shall be to not offer any messages.
|
||||
|
||||
Indirect messages shall be low in credibility, while direct synced (by author), with mid credibility.
|
||||
|
||||
Either only high or mid credibility shall be sent.
|
||||
|
||||
|
||||
Manual exceptions to all can be made at the users discretion, eg for other self owned devices.
|
||||
|
||||
## File transfer requests
|
||||
|
||||
TODO: is reusing the ft request api a good idea for this?
|
||||
|
||||
| fttype | name | content (ft id) |
|
||||
|------------|------|---------------------|
|
||||
| 0x00000f00 | time range | - ts start </br> - ts end </br> - supported compression? |
|
||||
| | TODO: id range based request? | |
|
||||
| 0x00000f01 | single message | - ppk </br> - mid </br> - ts |
|
||||
|
||||
## File transfers
|
||||
|
||||
| fttype | name | content |
|
||||
|------------|------|---------------------|
|
||||
| 0x00000f00 | time range | - feature bitset (1byte? different compressions?) </br> - ts start </br> - ts end </br> - list size </br> \\+ entry `ppk` </br> \\+ entry `mid` </br> \\+ entry `ts` |
|
||||
| 0x00000f01 | single message | - message type (text/textaction/file) </br> - text if text or action, file type and file id if file |
|
||||
|
||||
## TODO
|
||||
|
||||
- [ ] figure out a pro-active approach (instead of waiting for a range request)
|
||||
- [ ] compression in the ft layer? (would make it reusable) hint/autodetect?
|
||||
|
Reference in New Issue
Block a user