From c3082e0140b14a8d64faab42a65fbb418facc5f7 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sun, 31 Dec 2023 15:03:30 +0100 Subject: [PATCH] first actual gossip (very meh) --- src/solanaceae/crdtnotes/crdtnotes_sync.cpp | 64 +++++++++++++++++++++ src/solanaceae/crdtnotes/crdtnotes_sync.hpp | 4 ++ 2 files changed, 68 insertions(+) diff --git a/src/solanaceae/crdtnotes/crdtnotes_sync.cpp b/src/solanaceae/crdtnotes/crdtnotes_sync.cpp index ec85429..54a7ebf 100644 --- a/src/solanaceae/crdtnotes/crdtnotes_sync.cpp +++ b/src/solanaceae/crdtnotes/crdtnotes_sync.cpp @@ -1,5 +1,7 @@ #include "./crdtnotes_sync.hpp" +#include + #include #include @@ -24,6 +26,60 @@ CRDTNotesSync::~CRDTNotesSync(void) { } float CRDTNotesSync::iterate(float time_delta) { + if (!_gossip_queue.empty()) { + // TODO: set is sorted by id, not by order added + // only one per iterate *should* be enough + const auto it = _gossip_queue.cbegin(); + if (_docs_contacts.count(*it)) { + for (const auto& c : _docs_contacts.at(*it)) { + if (!c.all_of()) { + + // TODO: this is a fallback, remove + if (c.all_of()) { + for (const auto child : c.get().subs) { + if (!c.registry()->all_of(child)) { + continue; + } + + c.registry()->get(child)->SendGossip({*c.registry(), child}, *it); + } + } + continue; // skip, not impl + } + + c.get()->SendGossip(c, *it); + } + } + _gossip_queue.erase(it); + } + + if (!_fetch_frontier_queue.empty()) { + // TODO: set is sorted by id, not by order added + // only one per iterate *should* be enough + const auto it = _fetch_frontier_queue.cbegin(); + if (_docs_contacts.count(*it)) { + for (const auto& c : _docs_contacts.at(*it)) { + if (!c.all_of()) { + + // TODO: this is a fallback, remove + if (c.all_of()) { + for (const auto child : c.get().subs) { + if (!c.registry()->all_of(child)) { + continue; + } + + c.registry()->get(child)->SendFetchCompleteFrontier({*c.registry(), child}, *it); + } + } + continue; // skip, not impl + } + + c.get()->SendFetchCompleteFrontier(c, *it); + } + } + _fetch_frontier_queue.erase(it); + } + return 1.f; } @@ -66,6 +122,8 @@ std::optional CRDTNotesSync::addNewDoc(Contact3Handle c, bool _docs_contacts[new_id].emplace(c); } + _gossip_queue.emplace(new_id); + return new_id; } @@ -87,6 +145,12 @@ bool CRDTNotesSync::addDoc(const CRDTNotes::DocID& doc_id, Contact3Handle c) { _docs_contacts[doc_id].emplace(c); + if (doc_ptr != nullptr) { + // new for us + _gossip_queue.emplace(doc_id); + _fetch_frontier_queue.emplace(doc_id); + } + return doc_ptr != nullptr; } diff --git a/src/solanaceae/crdtnotes/crdtnotes_sync.hpp b/src/solanaceae/crdtnotes/crdtnotes_sync.hpp index 257c8ee..a7c2008 100644 --- a/src/solanaceae/crdtnotes/crdtnotes_sync.hpp +++ b/src/solanaceae/crdtnotes/crdtnotes_sync.hpp @@ -82,6 +82,10 @@ class CRDTNotesSync final : public CRDTNotesEventI { std::unordered_map> _docs_contacts; + // if a doc is eg new, it is added here + std::set _gossip_queue; + std::set _fetch_frontier_queue; + public: CRDTNotesSync(CRDTNotes& notes, Contact3Registry& cr);