diff --git a/CMakeLists.txt b/CMakeLists.txt index 7380887..87f5bb1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,8 +46,8 @@ target_link_libraries(solanaceae_ngcft1 PUBLIC add_library(solanaceae_ngchs2 ./solanaceae/ngc_hs2/ngc_hs2_send.hpp ./solanaceae/ngc_hs2/ngc_hs2_send.cpp - ./solanaceae/ngc_hs2/ngc_hs2_recv.hpp - ./solanaceae/ngc_hs2/ngc_hs2_recv.cpp + #./solanaceae/ngc_hs2/ngc_hs2_recv.hpp + #./solanaceae/ngc_hs2/ngc_hs2_recv.cpp ) target_include_directories(solanaceae_ngchs2 PUBLIC .) target_compile_features(solanaceae_ngchs2 PUBLIC cxx_std_17) diff --git a/solanaceae/ngc_ft1/ngcft1.cpp b/solanaceae/ngc_ft1/ngcft1.cpp index c65e83d..d51b383 100644 --- a/solanaceae/ngc_ft1/ngcft1.cpp +++ b/solanaceae/ngc_ft1/ngcft1.cpp @@ -56,7 +56,9 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ timeouts_set.erase({idx, id}); can_packet_size -= data.size(); } else { +#if 0 // too spammy std::cerr << "NGCFT1 warning: no space to resend timedout\n"; +#endif } } }); diff --git a/solanaceae/ngc_ft1/ngcft1_file_kind.hpp b/solanaceae/ngc_ft1/ngcft1_file_kind.hpp index b4936c0..cc92488 100644 --- a/solanaceae/ngc_ft1/ngcft1_file_kind.hpp +++ b/solanaceae/ngc_ft1/ngcft1_file_kind.hpp @@ -75,20 +75,7 @@ enum class NGCFT1_file_kind : uint32_t { // https://gist.github.com/Green-Sky/440cd9817a7114786850eb4c62dc57c3 // id: ts start, ts end - // content: - // - ts start (do we need this? when this is part of the id?) - // - ts end (same) - // - list size - // - ppk - // - mid - // - ts - HS2_INFO_RANGE_TIME = 0x00000f00, - // TODO: half open ranges - // TODO: id based - // TODO: ppk based? - - // id: ppk, mid, ts - HS2_SINGLE_MESSAGE, - // TODO: message pack + HS2_RANGE_TIME = 0x00000f00, // TODO: remove, did not survive + HS2_RANGE_TIME_MSGPACK = 0x00000f02, }; diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp index 758493a..2288741 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp @@ -257,7 +257,7 @@ std::vector ChunkPicker::updateChunkRequests( assert(objreg.valid(participating_in_last)); auto it = participating_unfinished.find(participating_in_last); - // hard limit robin rounds to array size time 20 + // hard limit robin rounds to array size times 20 for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) { if (it == participating_unfinished.end()) { it = participating_unfinished.begin(); @@ -375,7 +375,6 @@ std::vector ChunkPicker::updateChunkRequests( } } - //if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) { if (it == participating_unfinished.end()) { participating_in_last = entt::null; } else { diff --git a/solanaceae/ngc_hs2/ngc_hs2_recv.cpp b/solanaceae/ngc_hs2/ngc_hs2_recv.cpp deleted file mode 100644 index a9631fa..0000000 --- a/solanaceae/ngc_hs2/ngc_hs2_recv.cpp +++ /dev/null @@ -1,113 +0,0 @@ -#include "./ngc_hs2_recv.hpp" - -#include - -NGCHS2Recv::NGCHS2Recv( - Contact3Registry& cr, - RegistryMessageModelI& rmm, - ToxContactModel2& tcm, - ToxEventProviderI& tep, - NGCFT1& nft -) : - _cr(cr), - _rmm(rmm), - _rmm_sr(_rmm.newSubRef(this)), - _tcm(tcm), - _tep_sr(tep.newSubRef(this)), - _nft(nft), - _nftep_sr(_nft.newSubRef(this)) -{ - _rmm_sr - .subscribe(RegistryMessageModel_Event::message_construct) - .subscribe(RegistryMessageModel_Event::message_updated) - .subscribe(RegistryMessageModel_Event::message_destroy) - ; - - _tep_sr - .subscribe(TOX_EVENT_GROUP_PEER_JOIN) - .subscribe(TOX_EVENT_GROUP_PEER_EXIT) - ; - - _nftep_sr - .subscribe(NGCFT1_Event::recv_request) - .subscribe(NGCFT1_Event::recv_init) - .subscribe(NGCFT1_Event::recv_data) - .subscribe(NGCFT1_Event::send_data) - .subscribe(NGCFT1_Event::recv_done) - .subscribe(NGCFT1_Event::send_done) - ; -} - -NGCHS2Recv::~NGCHS2Recv(void) { -} - -float NGCHS2Recv::iterate(float delta) { - return 1000.f; -} - -bool NGCHS2Recv::onEvent(const Message::Events::MessageConstruct&) { - return false; -} - -bool NGCHS2Recv::onEvent(const Message::Events::MessageUpdated&) { - return false; -} - -bool NGCHS2Recv::onEvent(const Message::Events::MessageDestory&) { - return false; -} - -bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_request& e) { - if ( - e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME && - e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE - ) { - return false; // not for us - } - - return false; -} - -bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_init& e) { - if ( - e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME && - e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE - ) { - return false; // not for us - } - - return false; -} - -bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_data&) { - return false; -} - -bool NGCHS2Recv::onEvent(const Events::NGCFT1_send_data&) { - return false; -} - -bool NGCHS2Recv::onEvent(const Events::NGCFT1_recv_done&) { - return false; -} - -bool NGCHS2Recv::onEvent(const Events::NGCFT1_send_done&) { - return false; -} - -bool NGCHS2Recv::onToxEvent(const Tox_Event_Group_Peer_Join* e) { - const auto group_number = tox_event_group_peer_join_get_group_number(e); - const auto peer_number = tox_event_group_peer_join_get_peer_id(e); - - const auto c = _tcm.getContactGroupPeer(group_number, peer_number); - assert(c); - - // add to check list with inital cooldown - - return false; -} - -bool NGCHS2Recv::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { - return false; -} - diff --git a/solanaceae/ngc_hs2/ngc_hs2_recv.hpp b/solanaceae/ngc_hs2/ngc_hs2_recv.hpp deleted file mode 100644 index ed994ed..0000000 --- a/solanaceae/ngc_hs2/ngc_hs2_recv.hpp +++ /dev/null @@ -1,83 +0,0 @@ -#pragma once - -#include - -#include -#include - -#include - -#include - -// fwd -class ToxContactModel2; - -// time ranges -// should we just do last x minutes like zngchs? -// properly done, we need to use: -// - Message::Components::ViewCurserBegin -// - Message::Components::ViewCurserEnd -// -// on startup, manually check all registries for ranges (meh) (do later) -// listen on message events, check if range, see if range satisfied recently -// deal with a queue, and delay (at least 1sec, 3-10sec after a peer con change) -// or we always overrequest (eg 48h), and only fetch messages in, or close to range - -class NGCHS2Recv : public RegistryMessageModelEventI, public ToxEventI, public NGCFT1EventI { - Contact3Registry& _cr; - RegistryMessageModelI& _rmm; - RegistryMessageModelI::SubscriptionReference _rmm_sr; - ToxContactModel2& _tcm; - ToxEventProviderI::SubscriptionReference _tep_sr; - NGCFT1& _nft; - NGCFT1EventProviderI::SubscriptionReference _nftep_sr; - - // describes our knowlage of a remote peer - struct RemoteInfo { - // list of all ppk+mid+ts they sent us (filtered by reqs, like range, ppk...) - // with when it last sent a range? hmm - }; - entt::dense_map _remote_info; - - // open/running info requests (by c) - - // open/running info responses (by c) - - static const bool _only_send_self_observed {true}; - static const int64_t _max_time_into_past_default {60}; // s - - public: - NGCHS2Recv( - Contact3Registry& cr, - RegistryMessageModelI& rmm, - ToxContactModel2& tcm, - ToxEventProviderI& tep, - NGCFT1& nf - ); - - ~NGCHS2Recv(void); - - float iterate(float delta); - - // add to queue with timer - // check and updates all existing cursers for giving reg in queue - void enqueueWantCurser(Message3Handle m); - - protected: - bool onEvent(const Message::Events::MessageConstruct&) override; - bool onEvent(const Message::Events::MessageUpdated&) override; - bool onEvent(const Message::Events::MessageDestory&) override; - - protected: - bool onEvent(const Events::NGCFT1_recv_request&) override; - bool onEvent(const Events::NGCFT1_recv_init&) override; - bool onEvent(const Events::NGCFT1_recv_data&) override; - bool onEvent(const Events::NGCFT1_send_data&) override; - bool onEvent(const Events::NGCFT1_recv_done&) override; - bool onEvent(const Events::NGCFT1_send_done&) override; - - protected: - bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override; - bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override; -}; - diff --git a/solanaceae/ngc_hs2/ngc_hs2_send.cpp b/solanaceae/ngc_hs2/ngc_hs2_send.cpp index c83045e..dbf8e96 100644 --- a/solanaceae/ngc_hs2/ngc_hs2_send.cpp +++ b/solanaceae/ngc_hs2/ngc_hs2_send.cpp @@ -12,7 +12,7 @@ namespace Components { -void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) { +void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request) { // TODO: do more than exact dedupe for (const auto& [ts_start, ts_end] : _queue) { if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) { @@ -23,15 +23,6 @@ void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) { _queue.push_back(new_request); } -void IncommingMsgRequestQueue::queueRequest(const SingleMessageRequest& new_request) { - for (const auto& [ppk, mid, ts] : _queue) { - if (mid == new_request.mid && ts == new_request.ts && ppk == new_request.ppk) { - return; // already enqueued - } - } - _queue.push_back(new_request); -} - } // Components @@ -75,7 +66,7 @@ float NGCHS2Send::iterate(float delta) { auto fn_iirq = [this](auto&& view) { for (auto&& [cv, iirq] : view.each()) { Contact3Handle c{_cr, cv}; - auto& iirr = c.get_or_emplace(); + auto& iirr = c.get_or_emplace(); // dedup queued from running @@ -87,38 +78,15 @@ float NGCHS2Send::iterate(float delta) { } }; - auto fn_imrq = [this](auto&& view) { - for (auto&& [cv, imrq] : view.each()) { - Contact3Handle c{_cr, cv}; - auto& imrr = c.get_or_emplace(); - - // dedup queued from running - - if (imrr._list.size() >= _max_parallel_per_peer) { - continue; - } - - // new ft here? - } - }; - // first handle range requests on weak self - //for (auto&& [cv, iirq] : _cr.view().each()) { - fn_iirq(_cr.view()); - - // then handle messages on weak self - //for (auto&& [cv, imrq] : _cr.view().each()) { - fn_imrq(_cr.view()); + //for (auto&& [cv, iirq] : _cr.view().each()) { + fn_iirq(_cr.view()); // we could stop here, if too much is already running // then range on others - //for (auto&& [cv, iirq] : _cr.view(entt::exclude_t{}).each()) { - fn_iirq(_cr.view(entt::exclude_t{})); - - // then messages on others - //for (auto&& [cv, imrq] : _cr.view(entt::exclude_t{}).each()) { - fn_imrq(_cr.view(entt::exclude_t{})); + //for (auto&& [cv, iirq] : _cr.view(entt::exclude_t{}).each()) { + fn_iirq(_cr.view(entt::exclude_t{})); return 1000.f; } @@ -146,7 +114,7 @@ static uint64_t deserlTS(ByteSpan ts_bytes) { return deserlSimpleType(ts_bytes); } -void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) { +void NGCHS2Send::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) { ByteSpan fid{e.file_id, e.file_id_size}; // TODO: better size check if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) { @@ -172,12 +140,13 @@ void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request // dedupe insert into queue // how much overlap do we allow? - c.get_or_emplace().queueRequest({ + c.get_or_emplace().queueRequest({ ts_start, ts_end, }); } +#if 0 void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) { ByteSpan fid{e.file_id, e.file_id_size}; // TODO: better size check @@ -208,13 +177,6 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv return; } - // file content - // - message type (text/textaction/file(ft1sha1)) - // - if text/textaction - // - text (string) - // - else if file - // - file type - // - file id // for queue, we need group, peer, msg_ppk, msg_mid, msg_ts @@ -225,6 +187,7 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv ts, }); } +#endif bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) { return false; @@ -240,8 +203,7 @@ bool NGCHS2Send::onEvent(const Message::Events::MessageDestory&) { bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) { if ( - e.file_kind != NGCFT1_file_kind::HS2_INFO_RANGE_TIME && - e.file_kind != NGCFT1_file_kind::HS2_SINGLE_MESSAGE + e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK ) { return false; // not for us } @@ -267,10 +229,8 @@ bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) { // - out of max time range (ft specific, not a quick_allow) } - if (e.file_kind == NGCFT1_file_kind::HS2_INFO_RANGE_TIME) { - handleRange(c, e); - } else if (e.file_kind == NGCFT1_file_kind::HS2_SINGLE_MESSAGE) { - handleSingleMessage(c, e); + if (e.file_kind == NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) { + handleTimeRange(c, e); } return true; diff --git a/solanaceae/ngc_hs2/ngc_hs2_send.hpp b/solanaceae/ngc_hs2/ngc_hs2_send.hpp index aa8b86b..a1657fc 100644 --- a/solanaceae/ngc_hs2/ngc_hs2_send.hpp +++ b/solanaceae/ngc_hs2/ngc_hs2_send.hpp @@ -17,51 +17,28 @@ class ToxContactModel2; -struct InfoRequest { +struct TimeRangeRequest { uint64_t ts_start{0}; uint64_t ts_end{0}; }; -struct SingleMessageRequest { - ByteSpan ppk; - uint32_t mid {0}; - uint64_t ts {0}; // deciseconds -}; - // TODO: move to own file namespace Components { - struct IncommingInfoRequestQueue { - std::vector _queue; + struct IncommingTimeRangeRequestQueue { + std::vector _queue; // we should remove/notadd queued requests // that are subsets of same or larger ranges - void queueRequest(const InfoRequest& new_request); + void queueRequest(const TimeRangeRequest& new_request); }; - struct IncommingInfoRequestRunning { + struct IncommingTimeRangeRequestRunning { struct Entry { - InfoRequest ir; + TimeRangeRequest ir; std::vector data; // trasfer data in memory }; entt::dense_map _list; }; - - struct IncommingMsgRequestQueue { - // optimize dup lookups (this list could be large) - std::vector _queue; - - // removes dups - void queueRequest(const SingleMessageRequest& new_request); - }; - - struct IncommingMsgRequestRunning { - struct Entry { - SingleMessageRequest smr; - std::vector data; // trasfer data in memory - }; - // make more efficent? this list is very short - entt::dense_map _list; - }; } // Components class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { @@ -81,7 +58,7 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { // comp on peer c // limit to 2 uploads per peer simultaniously - // TODO: increase for prod (4?) + // TODO: increase for prod (4?) or maybe even lower? // currently per type constexpr static size_t _max_parallel_per_peer {2}; @@ -100,8 +77,7 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { float iterate(float delta); - void handleRange(Contact3Handle c, const Events::NGCFT1_recv_request&); - void handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request&); + void handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request&); protected: bool onEvent(const Message::Events::MessageConstruct&) override; diff --git a/solanaceae/ngc_hs2/spec_ngc_hs2.md b/solanaceae/ngc_hs2/spec_ngc_hs2.md index a1486f7..8071144 100644 --- a/solanaceae/ngc_hs2/spec_ngc_hs2.md +++ b/solanaceae/ngc_hs2/spec_ngc_hs2.md @@ -1,34 +1,47 @@ -# [NGC] Group-History-Sync (v2) [PoC] [Draft] +# [NGC] Group-History-Sync (v2.1) [PoC] [Draft] -Simple group history sync that uses `peer public key` + `message_id` + `timestamp` (`ppk+mid+ts`) to, mostly, uniquely identify messages and deliver them. +Simple group history sync that uses `timestamp` + `peer public key` + `message_id` (`ts+ppk+mid`) to, mostly, uniquely identify messages and deliver them. + +Messages are bundled up in a `msgpack` `array` and sent as a file transfer. ## Requirements -TODO +TODO: more? + +### Msgpack + +For serializing the messages. ### File transfers -For sending packs of messages. A single message can be larger than a single custom packet, so this is a must-have. +For sending packs of messages. +Even a single message can be larger than a single custom packet, so this is a must-have. +This also allows for compression down the road. ## Procedure -Peer A can request `ppk+mid+ts` list for a given time range from peer B. +Peer A can request `ts+ppk+mid+msg` list for a given time range from peer B. -Peer B then sends a filetransfer (with special file type) of list of `ppk+mid+ts`. -Optionally compressed. (Delta-coding / zstd) +Peer B then sends a filetransfer (with special file type) of list of `ts+ppk+mid+msg`. +Optionally compressed. (Delta-coding? / zstd) Peer A keeps doing that until the desired time span is covered. -After that or simultaniously, Peer A requests messages from peer B, either indivitually, or packed? in ranges?. -Optionally compressed. - During all that, peer B usually does the same thing to peer A. +TODO: deny request explicitly. also why (like perms and time range too large etc) + ## Traffic savings It is recomended to remember if a range has been requested and answered from a given peer, to reduce traffic. While compression is optional, it is recommended. +Timestamps fit delta coding. +Peer keys fit dicts. +Message ids are mostly high entropy. +The Message itself is text, so dict/huffman fits well. + +TODO: store the 4 coloms SoA instead of AoS ? ## Message uniqueness @@ -59,19 +72,41 @@ TODO: is reusing the ft request api a good idea for this? | fttype | name | content (ft id) | |------------|------|---------------------| -| 0x00000f00 | time range | - ts start
- ts end
- supported compression? | -| | TODO: id range based request? | | -| 0x00000f01 | single message | - ppk
- mid
- ts | +| 0x00000f02 | time range msgpack | - ts start
- ts end | -## File transfers +## File transfer content -| fttype | name | content | -|------------|------|---------------------| -| 0x00000f00 | time range | - feature bitset (1byte? different compressions?)
- ts start
- ts end
- list size
\\+ entry `ppk`
\\+ entry `mid`
\\+ entry `ts` | -| 0x00000f01 | single message | - message type (text/textaction/file)
- text if text or action, file type and file id if file | +| fttype | name | content | note | +|------------|------|----------------------------|---| +| 0x00000f02 | time range msgpack | `message list` in msgpack | | + +### time range msgpack + +Msgpack array of messages. + +``` + name | type/size | note +-------------------------|-------------------|----- +- array | 32bit number msgs + - ts | 64bit deciseconds + - ppk | 32bytes + - mid | 16bit + - msgtype | enum (string or number?) + - if text/textaction | + - text | string | maybe byte array instead? + - if file | + - fkind | 32bit enum | is this right? + - fid | bytes kind | length depends on kind +``` + +Name is the actual string key. +Data type sizes are suggestions, if not defined by the tox protocol. + +How unknown `msgtype`s are handled is client defined. +They can be fully ignored or displayed as broken. ## TODO - [ ] figure out a pro-active approach (instead of waiting for a range request) -- [ ] compression in the ft layer? (would make it reusable) hint/autodetect? +- [ ] compression in the ft layer? (would make it reusable) hint/autodetect/autoenable for >1k ?