updated spec to 2.1, changed almost everything

other small fixes
This commit is contained in:
Green Sky 2024-11-28 12:39:57 +01:00
parent 34dc01d4dc
commit 741f1428d3
No known key found for this signature in database
9 changed files with 82 additions and 319 deletions

View File

@ -46,8 +46,8 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC
add_library(solanaceae_ngchs2
./solanaceae/ngc_hs2/ngc_hs2_send.hpp
./solanaceae/ngc_hs2/ngc_hs2_send.cpp
./solanaceae/ngc_hs2/ngc_hs2_recv.hpp
./solanaceae/ngc_hs2/ngc_hs2_recv.cpp
#./solanaceae/ngc_hs2/ngc_hs2_recv.hpp
#./solanaceae/ngc_hs2/ngc_hs2_recv.cpp
)
target_include_directories(solanaceae_ngchs2 PUBLIC .)
target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17)

View File

@ -56,7 +56,9 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
timeouts_set.erase({idx, id});
can_packet_size -= data.size();
} else {
#if 0 // too spammy
std::cerr << "NGCFT1 warning: no space to resend timedout\n";
#endif
}
}
});

View File

@ -75,20 +75,7 @@ enum class NGCFT1_file_kind : uint32_t {
// https://gist.github.com/Green-Sky/440cd9817a7114786850eb4c62dc57c3
// id: ts start, ts end
// content:
// - ts start (do we need this? when this is part of the id?)
// - ts end (same)
// - list size
// - ppk
// - mid
// - ts
HS2_INFO_RANGE_TIME = 0x00000f00,
// TODO: half open ranges
// TODO: id based
// TODO: ppk based?
// id: ppk, mid, ts
HS2_SINGLE_MESSAGE,
// TODO: message pack
HS2_RANGE_TIME = 0x00000f00, // TODO: remove, did not survive
HS2_RANGE_TIME_MSGPACK = 0x00000f02,
};

View File

@ -257,7 +257,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
assert(objreg.valid(participating_in_last));
auto it = participating_unfinished.find(participating_in_last);
// hard limit robin rounds to array size time 20
// hard limit robin rounds to array size times 20
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) {
if (it == participating_unfinished.end()) {
it = participating_unfinished.begin();
@ -375,7 +375,6 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
}
}
//if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
if (it == participating_unfinished.end()) {
participating_in_last = entt::null;
} else {

View File

@ -1,113 +0,0 @@
#include "./ngc_hs2_recv.hpp"
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
NGCHS2Recv::NGCHS2Recv(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCFT1& nft
) :
_cr(cr),
_rmm(rmm),
_rmm_sr(_rmm.newSubRef(this)),
_tcm(tcm),
_tep_sr(tep.newSubRef(this)),
_nft(nft),
_nftep_sr(_nft.newSubRef(this))
{
_rmm_sr
.subscribe(RegistryMessageModel_Event::message_construct)
.subscribe(RegistryMessageModel_Event::message_updated)
.subscribe(RegistryMessageModel_Event::message_destroy)
;
_tep_sr
.subscribe(TOX_EVENT_GROUP_PEER_JOIN)
.subscribe(TOX_EVENT_GROUP_PEER_EXIT)
;
_nftep_sr
.subscribe(NGCFT1_Event::recv_request)
.subscribe(NGCFT1_Event::recv_init)
.subscribe(NGCFT1_Event::recv_data)
.subscribe(NGCFT1_Event::send_data)
.subscribe(NGCFT1_Event::recv_done)
.subscribe(NGCFT1_Event::send_done)
;
}
NGCHS2Recv::~NGCHS2Recv(void) {
}
float NGCHS2Recv::iterate(float delta) {
return 1000.f;
}
bool NGCHS2Recv::onEvent(const Message::Events::MessageConstruct&) {
return false;
}
bool NGCHS2Recv::onEvent(const Message::Events::MessageUpdated&) {
return false;
}
bool NGCHS2Recv::onEvent(const Message::Events::MessageDestory&) {
return false;
}
bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_request& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
) {
return false; // not for us
}
return false;
}
bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_init& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
) {
return false; // not for us
}
return false;
}
bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_data&) {
return false;
}
bool NGCHS2Recv::onEvent(const Events::NGCFT1_send_data&) {
return false;
}
bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_done&) {
return false;
}
bool NGCHS2Recv::onEvent(const Events::NGCFT1_send_done&) {
return false;
}
bool NGCHS2Recv::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
const auto group_number = tox_event_group_peer_join_get_group_number(e);
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
const auto c = _tcm.getContactGroupPeer(group_number, peer_number);
assert(c);
// add to check list with inital cooldown
return false;
}
bool NGCHS2Recv::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
return false;
}

View File

@ -1,83 +0,0 @@
#pragma once
#include <solanaceae/toxcore/tox_event_interface.hpp>
#include <solanaceae/contact/contact_model3.hpp>
#include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/ngc_ft1/ngcft1.hpp>
#include <entt/container/dense_map.hpp>
// fwd
class ToxContactModel2;
// time ranges
// should we just do last x minutes like zngchs?
// properly done, we need to use:
// - Message::Components::ViewCurserBegin
// - Message::Components::ViewCurserEnd
//
// on startup, manually check all registries for ranges (meh) (do later)
// listen on message events, check if range, see if range satisfied recently
// deal with a queue, and delay (at least 1sec, 3-10sec after a peer con change)
// or we always overrequest (eg 48h), and only fetch messages in, or close to range
class NGCHS2Recv : public RegistryMessageModelEventI, public ToxEventI, public NGCFT1EventI {
Contact3Registry& _cr;
RegistryMessageModelI& _rmm;
RegistryMessageModelI::SubscriptionReference _rmm_sr;
ToxContactModel2& _tcm;
ToxEventProviderI::SubscriptionReference _tep_sr;
NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
// describes our knowlage of a remote peer
struct RemoteInfo {
// list of all ppk+mid+ts they sent us (filtered by reqs, like range, ppk...)
// with when it last sent a range? hmm
};
entt::dense_map<Contact3, RemoteInfo> _remote_info;
// open/running info requests (by c)
// open/running info responses (by c)
static const bool _only_send_self_observed {true};
static const int64_t _max_time_into_past_default {60}; // s
public:
NGCHS2Recv(
Contact3Registry& cr,
RegistryMessageModelI& rmm,
ToxContactModel2& tcm,
ToxEventProviderI& tep,
NGCFT1& nf
);
~NGCHS2Recv(void);
float iterate(float delta);
// add to queue with timer
// check and updates all existing cursers for giving reg in queue
void enqueueWantCurser(Message3Handle m);
protected:
bool onEvent(const Message::Events::MessageConstruct&) override;
bool onEvent(const Message::Events::MessageUpdated&) override;
bool onEvent(const Message::Events::MessageDestory&) override;
protected:
bool onEvent(const Events::NGCFT1_recv_request&) override;
bool onEvent(const Events::NGCFT1_recv_init&) override;
bool onEvent(const Events::NGCFT1_recv_data&) override;
bool onEvent(const Events::NGCFT1_send_data&) override;
bool onEvent(const Events::NGCFT1_recv_done&) override;
bool onEvent(const Events::NGCFT1_send_done&) override;
protected:
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;
};

View File

@ -12,7 +12,7 @@
namespace Components {
void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) {
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request) {
// TODO: do more than exact dedupe
for (const auto& [ts_start, ts_end] : _queue) {
if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) {
@ -23,15 +23,6 @@ void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) {
_queue.push_back(new_request);
}
void IncommingMsgRequestQueue::queueRequest(const SingleMessageRequest& new_request) {
for (const auto& [ppk, mid, ts] : _queue) {
if (mid == new_request.mid && ts == new_request.ts && ppk == new_request.ppk) {
return; // already enqueued
}
}
_queue.push_back(new_request);
}
} // Components
@ -75,7 +66,7 @@ float NGCHS2Send::iterate(float delta) {
auto fn_iirq = [this](auto&& view) {
for (auto&& [cv, iirq] : view.each()) {
Contact3Handle c{_cr, cv};
auto& iirr = c.get_or_emplace<Components::IncommingInfoRequestRunning>();
auto& iirr = c.get_or_emplace<Components::IncommingTimeRangeRequestRunning>();
// dedup queued from running
@ -87,38 +78,15 @@ float NGCHS2Send::iterate(float delta) {
}
};
auto fn_imrq = [this](auto&& view) {
for (auto&& [cv, imrq] : view.each()) {
Contact3Handle c{_cr, cv};
auto& imrr = c.get_or_emplace<Components::IncommingMsgRequestRunning>();
// dedup queued from running
if (imrr._list.size() >= _max_parallel_per_peer) {
continue;
}
// new ft here?
}
};
// first handle range requests on weak self
//for (auto&& [cv, iirq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>().each()) {
fn_iirq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>());
// then handle messages on weak self
//for (auto&& [cv, imrq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>().each()) {
fn_imrq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>());
//for (auto&& [cv, iirq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingTimeRangeRequestQueue>().each()) {
fn_iirq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingTimeRangeRequestQueue>());
// we could stop here, if too much is already running
// then range on others
//for (auto&& [cv, iirq] : _cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
fn_iirq(_cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
// then messages on others
//for (auto&& [cv, imrq] : _cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
fn_imrq(_cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
//for (auto&& [cv, iirq] : _cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
return 1000.f;
}
@ -146,7 +114,7 @@ static uint64_t deserlTS(ByteSpan ts_bytes) {
return deserlSimpleType<uint64_t>(ts_bytes);
}
void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
void NGCHS2Send::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
@ -172,12 +140,13 @@ void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request
// dedupe insert into queue
// how much overlap do we allow?
c.get_or_emplace<Components::IncommingInfoRequestQueue>().queueRequest({
c.get_or_emplace<Components::IncommingTimeRangeRequestQueue>().queueRequest({
ts_start,
ts_end,
});
}
#if 0
void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
@ -208,13 +177,6 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv
return;
}
// file content
// - message type (text/textaction/file(ft1sha1))
// - if text/textaction
// - text (string)
// - else if file
// - file type
// - file id
// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts
@ -225,6 +187,7 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv
ts,
});
}
#endif
bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) {
return false;
@ -240,8 +203,7 @@ bool NGCHS2Send::onEvent(const Message::Events::MessageDestory&) {
bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) {
if (
e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME &&
e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE
e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK
) {
return false; // not for us
}
@ -267,10 +229,8 @@ bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) {
// - out of max time range (ft specific, not a quick_allow)
}
if (e.file_kind == NGCFT1_file_kind::HS2_INFO_RANGE_TIME) {
handleRange(c, e);
} else if (e.file_kind == NGCFT1_file_kind::HS2_SINGLE_MESSAGE) {
handleSingleMessage(c, e);
if (e.file_kind == NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) {
handleTimeRange(c, e);
}
return true;

View File

@ -17,51 +17,28 @@
class ToxContactModel2;
struct InfoRequest {
struct TimeRangeRequest {
uint64_t ts_start{0};
uint64_t ts_end{0};
};
struct SingleMessageRequest {
ByteSpan ppk;
uint32_t mid {0};
uint64_t ts {0}; // deciseconds
};
// TODO: move to own file
namespace Components {
struct IncommingInfoRequestQueue {
std::vector<InfoRequest> _queue;
struct IncommingTimeRangeRequestQueue {
std::vector<TimeRangeRequest> _queue;
// we should remove/notadd queued requests
// that are subsets of same or larger ranges
void queueRequest(const InfoRequest& new_request);
void queueRequest(const TimeRangeRequest& new_request);
};
struct IncommingInfoRequestRunning {
struct IncommingTimeRangeRequestRunning {
struct Entry {
InfoRequest ir;
TimeRangeRequest ir;
std::vector<uint8_t> data; // trasfer data in memory
};
entt::dense_map<uint8_t, Entry> _list;
};
struct IncommingMsgRequestQueue {
// optimize dup lookups (this list could be large)
std::vector<SingleMessageRequest> _queue;
// removes dups
void queueRequest(const SingleMessageRequest& new_request);
};
struct IncommingMsgRequestRunning {
struct Entry {
SingleMessageRequest smr;
std::vector<uint8_t> data; // trasfer data in memory
};
// make more efficent? this list is very short
entt::dense_map<uint8_t, Entry> _list;
};
} // Components
class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
@ -81,7 +58,7 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
// comp on peer c
// limit to 2 uploads per peer simultaniously
// TODO: increase for prod (4?)
// TODO: increase for prod (4?) or maybe even lower?
// currently per type
constexpr static size_t _max_parallel_per_peer {2};
@ -100,8 +77,7 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
float iterate(float delta);
void handleRange(Contact3Handle c, const Events::NGCFT1_recv_request&);
void handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request&);
void handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request&);
protected:
bool onEvent(const Message::Events::MessageConstruct&) override;

View File

@ -1,34 +1,47 @@
# [NGC] Group-History-Sync (v2) [PoC] [Draft]
# [NGC] Group-History-Sync (v2.1) [PoC] [Draft]
Simple group history sync that uses `peer public key` + `message_id` + `timestamp` (`ppk+mid+ts`) to, mostly, uniquely identify messages and deliver them.
Simple group history sync that uses `timestamp` + `peer public key` + `message_id` (`ts+ppk+mid`) to, mostly, uniquely identify messages and deliver them.
Messages are bundled up in a `msgpack` `array` and sent as a file transfer.
## Requirements
TODO
TODO: more?
### Msgpack
For serializing the messages.
### File transfers
For sending packs of messages. A single message can be larger than a single custom packet, so this is a must-have.
For sending packs of messages.
Even a single message can be larger than a single custom packet, so this is a must-have.
This also allows for compression down the road.
## Procedure
Peer A can request `ppk+mid+ts` list for a given time range from peer B.
Peer A can request `ts+ppk+mid+msg` list for a given time range from peer B.
Peer B then sends a filetransfer (with special file type) of list of `ppk+mid+ts`.
Optionally compressed. (Delta-coding / zstd)
Peer B then sends a filetransfer (with special file type) of list of `ts+ppk+mid+msg`.
Optionally compressed. (Delta-coding? / zstd)
Peer A keeps doing that until the desired time span is covered.
After that or simultaniously, Peer A requests messages from peer B, either indivitually, or packed? in ranges?.
Optionally compressed.
During all that, peer B usually does the same thing to peer A.
TODO: deny request explicitly. also why (like perms and time range too large etc)
## Traffic savings
It is recomended to remember if a range has been requested and answered from a given peer, to reduce traffic.
While compression is optional, it is recommended.
Timestamps fit delta coding.
Peer keys fit dicts.
Message ids are mostly high entropy.
The Message itself is text, so dict/huffman fits well.
TODO: store the 4 coloms SoA instead of AoS ?
## Message uniqueness
@ -59,19 +72,41 @@ TODO: is reusing the ft request api a good idea for this?
| fttype | name | content (ft id) |
|------------|------|---------------------|
| 0x00000f00 | time range | - ts start </br> - ts end </br> - supported compression? |
| | TODO: id range based request? | |
| 0x00000f01 | single message | - ppk </br> - mid </br> - ts |
| 0x00000f02 | time range msgpack | - ts start </br> - ts end |
## File transfers
## File transfer content
| fttype | name | content |
|------------|------|---------------------|
| 0x00000f00 | time range | - feature bitset (1byte? different compressions?) </br> - ts start </br> - ts end </br> - list size </br> \\+ entry `ppk` </br> \\+ entry `mid` </br> \\+ entry `ts` |
| 0x00000f01 | single message | - message type (text/textaction/file) </br> - text if text or action, file type and file id if file |
| fttype | name | content | note |
|------------|------|----------------------------|---|
| 0x00000f02 | time range msgpack | `message list` in msgpack | |
### time range msgpack
Msgpack array of messages.
```
name | type/size | note
-------------------------|-------------------|-----
- array | 32bit number msgs
- ts | 64bit deciseconds
- ppk | 32bytes
- mid | 16bit
- msgtype | enum (string or number?)
- if text/textaction |
- text | string | maybe byte array instead?
- if file |
- fkind | 32bit enum | is this right?
- fid | bytes kind | length depends on kind
```
Name is the actual string key.
Data type sizes are suggestions, if not defined by the tox protocol.
How unknown `msgtype`s are handled is client defined.
They can be fully ignored or displayed as broken.
## TODO
- [ ] figure out a pro-active approach (instead of waiting for a range request)
- [ ] compression in the ft layer? (would make it reusable) hint/autodetect?
- [ ] compression in the ft layer? (would make it reusable) hint/autodetect/autoenable for >1k ?