send and apply ops, basic editing working, if no ops are missed

This commit is contained in:
Green Sky
2025-09-02 12:53:26 +02:00
parent 95f3c821a5
commit 31f5adfcc0
2 changed files with 139 additions and 2 deletions

View File

@@ -5,10 +5,23 @@
#include <solanaceae/contact/components.hpp>
#include <entt/container/dense_set.hpp>
#include <cstdint>
#include <vector>
#include <iostream>
namespace Components {
// attached to contact
struct OpSendQueue {
std::map<CRDTNotes::DocID, std::vector<CRDTNotes::Doc::Op>> ops;
// HACK: limit to 5 ops per packet for now
// TODO: ft based alternative for >5 ops
};
} // Components
static ID32 id_from_vec(const std::vector<uint8_t>& vec) {
ID32 new_id;
for (size_t i = 0; i < new_id.size() && i < vec.size(); i++) {
@@ -27,6 +40,25 @@ CRDTNotesSync::~CRDTNotesSync(void) {
}
float CRDTNotesSync::iterate(float time_delta) {
for (auto doc_it = _docs_incoming_ops.begin(); doc_it != _docs_incoming_ops.end();) {
if (_notes.isWriteLocked(doc_it->first)) {
doc_it++;
continue;
}
auto lock_opt = _notes.writeLockAquire(doc_it->first);
assert(lock_opt);
auto* doc_ptr = getDoc(doc_it->first);
// TODO: record every applied op and throw event, so eg gui can react better
// , or better yet, edit events in string space (imgui can consume them)
doc_ptr->apply(doc_it->second);
std::cout << "CRDTNotesSync: applied " << doc_it->second.size() << " ops\n";
doc_it = _docs_incoming_ops.erase(doc_it);
}
if (!_gossip_queue.empty()) {
// TODO: set is sorted by id, not by order added
// only one per iterate *should* be enough
@@ -89,7 +121,40 @@ float CRDTNotesSync::iterate(float time_delta) {
_fetch_frontier_queue.erase(it);
}
return 2.f;
bool sending_ops {false};
{ // send ops in queue
std::vector<Contact4> empty_queue;
for (const auto& [c, op_comp, sync_model] : _cs.registry().view<Components::OpSendQueue, CRDTNotesContactSyncModelI*>().each()) {
// HACK: one pkg with up to 5 ops per tick per peer
//for (const auto& [doc_id, op_vec] : op_comp.ops) {
for (auto it = op_comp.ops.begin(); it != op_comp.ops.end();) {
if (it->second.empty()) {
it = op_comp.ops.erase(it);
continue;
} else if (it->second.size() <= 5) {
std::cout << "sending " << it->second.size() << " ops\n";
sync_model->SendOps(_cs.contactHandle(c), it->first, it->second);
it = op_comp.ops.erase(it);
//sending_ops = true;
} else {
std::vector<CRDTNotes::Doc::Op> tmp_ops {it->second.cbegin(), it->second.cbegin()+5};
assert(tmp_ops.size() == 5);
sync_model->SendOps(_cs.contactHandle(c), it->first, tmp_ops);
it->second.erase(it->second.cbegin(), it->second.cbegin()+5);
sending_ops = true;
}
break; // single update only
}
if (op_comp.ops.empty()) {
empty_queue.push_back(c);
}
}
_cs.registry().remove<Components::OpSendQueue>(empty_queue.cbegin(), empty_queue.cend());
}
return sending_ops ? 0.05f : 2.f;
}
CRDTNotes::Doc* CRDTNotesSync::getDoc(const CRDTNotes::DocID& doc_id) {
@@ -203,7 +268,50 @@ void CRDTNotesSync::merge(const CRDTNotes::DocID& doc_id, std::string_view new_t
auto op_vec = doc_ptr->merge(new_text);
std::cout << "doc changed " << op_vec.size() << " ops generated\n";
// USE OPS
// attach OpSendQueue to every contact
// needs to be placed at the contact with the sync model
entt::dense_set<Contact4> handled_contacts;
for (auto c : _docs_contacts.at(doc_id)) {
if (handled_contacts.contains(c)) {
continue;
}
if (!c.all_of<CRDTNotesContactSyncModelI*>()) {
// TODO: this is a fallback, remove
if (c.all_of<Contact::Components::ParentOf>()) {
for (const auto child : c.get<Contact::Components::ParentOf>().subs) {
if (handled_contacts.contains(child)) {
continue;
}
if (c.registry()->all_of<Contact::Components::TagSelfStrong>(child)) {
continue;
}
if (!c.registry()->all_of<CRDTNotesContactSyncModelI*>(child)) {
std::cerr << "CRDTNotesSync error: fallback failed\n";
continue;
}
auto& op_queue = c.registry()->get_or_emplace<Components::OpSendQueue>(child).ops[doc_id];
if (op_queue.empty()) {
op_queue = op_vec;
} else {
op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend());
}
handled_contacts.emplace(child);
}
}
continue; // skip, not impl
}
auto& op_queue = c.get_or_emplace<Components::OpSendQueue>().ops[doc_id];
if (op_queue.empty()) {
op_queue = op_vec;
} else {
op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend());
}
handled_contacts.emplace(c);
}
}
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) {
@@ -220,5 +328,31 @@ void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) {
}
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) {
addDoc(e.doc_id, e.c);
if (e.ops.empty()) {
std::cerr << "CRDTNotesSync warning: got empty ops event/pkg\n";
return;
}
// TODO: deduplicate ops ?
auto lock_opt = _notes.writeLockAquire(e.doc_id);
if (lock_opt) {
// TODO: perms n stuff
// TODO: check if seq missing
auto* doc_ptr = getDoc(e.doc_id);
// TODO: record every applied op and throw event, so eg gui can react better
// , or better yet, edit events in string space (imgui can consume them)
doc_ptr->apply(e.ops);
// TODO: check if new frontier
} else {
auto& op_in_vec = _docs_incoming_ops[e.doc_id];
if (op_in_vec.empty()) {
op_in_vec = e.ops;
} else {
op_in_vec.insert(op_in_vec.cend(), e.ops.cbegin(), e.ops.cend());
}
}
}

View File

@@ -98,6 +98,9 @@ class CRDTNotesSync final : public CRDTNotesEventI {
};
std::unordered_map<CRDTNotes::DocID, std::map<ContactHandle4, Peer>> _docs_peers;
// queue of unapplied ops, kept here until write lock can be aquired
std::unordered_map<CRDTNotes::DocID, std::vector<CRDTNotes::Doc::Op>> _docs_incoming_ops;
// if a doc is eg new, it is added here
std::set<CRDTNotes::DocID> _gossip_queue; // TODO: no
std::set<CRDTNotes::DocID> _fetch_frontier_queue;