From 31f5adfcc09340af76d1481327b13f1ba56d6ca4 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 2 Sep 2025 12:53:26 +0200 Subject: [PATCH] send and apply ops, basic editing working, if no ops are missed --- src/solanaceae/crdtnotes/crdtnotes_sync.cpp | 138 +++++++++++++++++++- src/solanaceae/crdtnotes/crdtnotes_sync.hpp | 3 + 2 files changed, 139 insertions(+), 2 deletions(-) diff --git a/src/solanaceae/crdtnotes/crdtnotes_sync.cpp b/src/solanaceae/crdtnotes/crdtnotes_sync.cpp index ff8a440..08d3303 100644 --- a/src/solanaceae/crdtnotes/crdtnotes_sync.cpp +++ b/src/solanaceae/crdtnotes/crdtnotes_sync.cpp @@ -5,10 +5,23 @@ #include +#include + #include #include #include +namespace Components { + +// attached to contact +struct OpSendQueue { + std::map> ops; + // HACK: limit to 5 ops per packet for now + // TODO: ft based alternative for >5 ops +}; + +} // Components + static ID32 id_from_vec(const std::vector& vec) { ID32 new_id; for (size_t i = 0; i < new_id.size() && i < vec.size(); i++) { @@ -27,6 +40,25 @@ CRDTNotesSync::~CRDTNotesSync(void) { } float CRDTNotesSync::iterate(float time_delta) { + for (auto doc_it = _docs_incoming_ops.begin(); doc_it != _docs_incoming_ops.end();) { + if (_notes.isWriteLocked(doc_it->first)) { + doc_it++; + continue; + } + + auto lock_opt = _notes.writeLockAquire(doc_it->first); + assert(lock_opt); + + auto* doc_ptr = getDoc(doc_it->first); + // TODO: record every applied op and throw event, so eg gui can react better + // , or better yet, edit events in string space (imgui can consume them) + doc_ptr->apply(doc_it->second); + + std::cout << "CRDTNotesSync: applied " << doc_it->second.size() << " ops\n"; + + doc_it = _docs_incoming_ops.erase(doc_it); + } + if (!_gossip_queue.empty()) { // TODO: set is sorted by id, not by order added // only one per iterate *should* be enough @@ -89,7 +121,40 @@ float CRDTNotesSync::iterate(float time_delta) { _fetch_frontier_queue.erase(it); } - return 2.f; + bool sending_ops {false}; + { // send ops in queue + std::vector empty_queue; + for (const auto& [c, op_comp, sync_model] : _cs.registry().view().each()) { + // HACK: one pkg with up to 5 ops per tick per peer + //for (const auto& [doc_id, op_vec] : op_comp.ops) { + for (auto it = op_comp.ops.begin(); it != op_comp.ops.end();) { + if (it->second.empty()) { + it = op_comp.ops.erase(it); + continue; + } else if (it->second.size() <= 5) { + std::cout << "sending " << it->second.size() << " ops\n"; + sync_model->SendOps(_cs.contactHandle(c), it->first, it->second); + it = op_comp.ops.erase(it); + //sending_ops = true; + } else { + std::vector tmp_ops {it->second.cbegin(), it->second.cbegin()+5}; + assert(tmp_ops.size() == 5); + sync_model->SendOps(_cs.contactHandle(c), it->first, tmp_ops); + it->second.erase(it->second.cbegin(), it->second.cbegin()+5); + sending_ops = true; + } + + break; // single update only + } + + if (op_comp.ops.empty()) { + empty_queue.push_back(c); + } + } + _cs.registry().remove(empty_queue.cbegin(), empty_queue.cend()); + } + + return sending_ops ? 0.05f : 2.f; } CRDTNotes::Doc* CRDTNotesSync::getDoc(const CRDTNotes::DocID& doc_id) { @@ -203,7 +268,50 @@ void CRDTNotesSync::merge(const CRDTNotes::DocID& doc_id, std::string_view new_t auto op_vec = doc_ptr->merge(new_text); std::cout << "doc changed " << op_vec.size() << " ops generated\n"; - // USE OPS + // attach OpSendQueue to every contact + // needs to be placed at the contact with the sync model + entt::dense_set handled_contacts; + for (auto c : _docs_contacts.at(doc_id)) { + if (handled_contacts.contains(c)) { + continue; + } + + if (!c.all_of()) { + + // TODO: this is a fallback, remove + if (c.all_of()) { + for (const auto child : c.get().subs) { + if (handled_contacts.contains(child)) { + continue; + } + if (c.registry()->all_of(child)) { + continue; + } + if (!c.registry()->all_of(child)) { + std::cerr << "CRDTNotesSync error: fallback failed\n"; + continue; + } + + auto& op_queue = c.registry()->get_or_emplace(child).ops[doc_id]; + if (op_queue.empty()) { + op_queue = op_vec; + } else { + op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend()); + } + handled_contacts.emplace(child); + } + } + continue; // skip, not impl + } + + auto& op_queue = c.get_or_emplace().ops[doc_id]; + if (op_queue.empty()) { + op_queue = op_vec; + } else { + op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend()); + } + handled_contacts.emplace(c); + } } void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) { @@ -220,5 +328,31 @@ void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) { } void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) { + addDoc(e.doc_id, e.c); + + if (e.ops.empty()) { + std::cerr << "CRDTNotesSync warning: got empty ops event/pkg\n"; + return; + } + + // TODO: deduplicate ops ? + auto lock_opt = _notes.writeLockAquire(e.doc_id); + if (lock_opt) { + // TODO: perms n stuff + // TODO: check if seq missing + auto* doc_ptr = getDoc(e.doc_id); + // TODO: record every applied op and throw event, so eg gui can react better + // , or better yet, edit events in string space (imgui can consume them) + doc_ptr->apply(e.ops); + + // TODO: check if new frontier + } else { + auto& op_in_vec = _docs_incoming_ops[e.doc_id]; + if (op_in_vec.empty()) { + op_in_vec = e.ops; + } else { + op_in_vec.insert(op_in_vec.cend(), e.ops.cbegin(), e.ops.cend()); + } + } } diff --git a/src/solanaceae/crdtnotes/crdtnotes_sync.hpp b/src/solanaceae/crdtnotes/crdtnotes_sync.hpp index 168388e..75ac45c 100644 --- a/src/solanaceae/crdtnotes/crdtnotes_sync.hpp +++ b/src/solanaceae/crdtnotes/crdtnotes_sync.hpp @@ -98,6 +98,9 @@ class CRDTNotesSync final : public CRDTNotesEventI { }; std::unordered_map> _docs_peers; + // queue of unapplied ops, kept here until write lock can be aquired + std::unordered_map> _docs_incoming_ops; + // if a doc is eg new, it is added here std::set _gossip_queue; // TODO: no std::set _fetch_frontier_queue;