first actual gossip (very meh)
This commit is contained in:
parent
e9e5ad88a1
commit
c3082e0140
@ -1,5 +1,7 @@
|
||||
#include "./crdtnotes_sync.hpp"
|
||||
|
||||
#include <solanaceae/crdtnotes/crdtnotes_contact_sync_model.hpp>
|
||||
|
||||
#include <solanaceae/contact/components.hpp>
|
||||
|
||||
#include <cstdint>
|
||||
@ -24,6 +26,60 @@ CRDTNotesSync::~CRDTNotesSync(void) {
|
||||
}
|
||||
|
||||
float CRDTNotesSync::iterate(float time_delta) {
|
||||
if (!_gossip_queue.empty()) {
|
||||
// TODO: set is sorted by id, not by order added
|
||||
// only one per iterate *should* be enough
|
||||
const auto it = _gossip_queue.cbegin();
|
||||
if (_docs_contacts.count(*it)) {
|
||||
for (const auto& c : _docs_contacts.at(*it)) {
|
||||
if (!c.all_of<CRDTNotesContactSyncModelI*>()) {
|
||||
|
||||
// TODO: this is a fallback, remove
|
||||
if (c.all_of<Contact::Components::ParentOf>()) {
|
||||
for (const auto child : c.get<Contact::Components::ParentOf>().subs) {
|
||||
if (!c.registry()->all_of<CRDTNotesContactSyncModelI*>(child)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
c.registry()->get<CRDTNotesContactSyncModelI*>(child)->SendGossip({*c.registry(), child}, *it);
|
||||
}
|
||||
}
|
||||
continue; // skip, not impl
|
||||
}
|
||||
|
||||
c.get<CRDTNotesContactSyncModelI*>()->SendGossip(c, *it);
|
||||
}
|
||||
}
|
||||
_gossip_queue.erase(it);
|
||||
}
|
||||
|
||||
if (!_fetch_frontier_queue.empty()) {
|
||||
// TODO: set is sorted by id, not by order added
|
||||
// only one per iterate *should* be enough
|
||||
const auto it = _fetch_frontier_queue.cbegin();
|
||||
if (_docs_contacts.count(*it)) {
|
||||
for (const auto& c : _docs_contacts.at(*it)) {
|
||||
if (!c.all_of<CRDTNotesContactSyncModelI*>()) {
|
||||
|
||||
// TODO: this is a fallback, remove
|
||||
if (c.all_of<Contact::Components::ParentOf>()) {
|
||||
for (const auto child : c.get<Contact::Components::ParentOf>().subs) {
|
||||
if (!c.registry()->all_of<CRDTNotesContactSyncModelI*>(child)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
c.registry()->get<CRDTNotesContactSyncModelI*>(child)->SendFetchCompleteFrontier({*c.registry(), child}, *it);
|
||||
}
|
||||
}
|
||||
continue; // skip, not impl
|
||||
}
|
||||
|
||||
c.get<CRDTNotesContactSyncModelI*>()->SendFetchCompleteFrontier(c, *it);
|
||||
}
|
||||
}
|
||||
_fetch_frontier_queue.erase(it);
|
||||
}
|
||||
|
||||
return 1.f;
|
||||
}
|
||||
|
||||
@ -66,6 +122,8 @@ std::optional<CRDTNotes::DocID> CRDTNotesSync::addNewDoc(Contact3Handle c, bool
|
||||
_docs_contacts[new_id].emplace(c);
|
||||
}
|
||||
|
||||
_gossip_queue.emplace(new_id);
|
||||
|
||||
return new_id;
|
||||
}
|
||||
|
||||
@ -87,6 +145,12 @@ bool CRDTNotesSync::addDoc(const CRDTNotes::DocID& doc_id, Contact3Handle c) {
|
||||
|
||||
_docs_contacts[doc_id].emplace(c);
|
||||
|
||||
if (doc_ptr != nullptr) {
|
||||
// new for us
|
||||
_gossip_queue.emplace(doc_id);
|
||||
_fetch_frontier_queue.emplace(doc_id);
|
||||
}
|
||||
|
||||
return doc_ptr != nullptr;
|
||||
}
|
||||
|
||||
|
@ -82,6 +82,10 @@ class CRDTNotesSync final : public CRDTNotesEventI {
|
||||
|
||||
std::unordered_map<CRDTNotes::DocID, std::set<Contact3Handle>> _docs_contacts;
|
||||
|
||||
// if a doc is eg new, it is added here
|
||||
std::set<CRDTNotes::DocID> _gossip_queue;
|
||||
std::set<CRDTNotes::DocID> _fetch_frontier_queue;
|
||||
|
||||
public:
|
||||
CRDTNotesSync(CRDTNotes& notes, Contact3Registry& cr);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user