Compare commits
3 Commits
396a03c229
...
master
Author | SHA1 | Date | |
---|---|---|---|
|
31f5adfcc0 | ||
|
95f3c821a5 | ||
|
9cab041af4 |
@@ -37,3 +37,20 @@ CRDTNotes::Doc* CRDTNotes::addDoc(const CRDTAgent& self_agent, const DocID& id)
|
|||||||
return &doc;
|
return &doc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CRDTNotes::writeLockRelease(const DocID& id) {
|
||||||
|
assert(_doc_write_locks.count(id) > 0);
|
||||||
|
_doc_write_locks.erase(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool CRDTNotes::isWriteLocked(const DocID& id) const {
|
||||||
|
return _doc_write_locks.count(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<CRDTNotes::DocWriteLock> CRDTNotes::writeLockAquire(const DocID& id) {
|
||||||
|
if (_doc_write_locks.count(id)) {
|
||||||
|
return std::nullopt; // replace with exception instead?
|
||||||
|
}
|
||||||
|
|
||||||
|
_doc_write_locks.emplace(id);
|
||||||
|
return DocWriteLock{*this, id};
|
||||||
|
}
|
||||||
|
@@ -6,6 +6,8 @@
|
|||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
#include <unordered_set>
|
||||||
|
#include <optional>
|
||||||
|
|
||||||
using ID32 = std::array<uint8_t, 32>;
|
using ID32 = std::array<uint8_t, 32>;
|
||||||
|
|
||||||
@@ -37,9 +39,23 @@ class CRDTNotes {
|
|||||||
uint64_t seq{0};
|
uint64_t seq{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// RAII lock wrapper
|
||||||
|
struct DocWriteLock {
|
||||||
|
CRDTNotes* notes;
|
||||||
|
DocID id;
|
||||||
|
|
||||||
|
// ctr assumes lock
|
||||||
|
DocWriteLock(CRDTNotes& notes, const DocID& id) : notes(¬es), id(id) {}
|
||||||
|
DocWriteLock(const DocWriteLock&) = delete;
|
||||||
|
DocWriteLock(DocWriteLock&& other) : notes(other.notes), id(other.id) { other.notes = nullptr; }
|
||||||
|
~DocWriteLock(void) { if (notes) { notes->writeLockRelease(id); } }
|
||||||
|
bool operator==(const DocWriteLock& other) const { return id == other.id; }
|
||||||
|
};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// TODO: add metadata to docs
|
// TODO: add metadata to docs
|
||||||
std::unordered_map<DocID, Doc> _docs;
|
std::unordered_map<DocID, Doc> _docs;
|
||||||
|
std::unordered_set<DocID> _doc_write_locks;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
// config?
|
// config?
|
||||||
@@ -52,5 +68,16 @@ class CRDTNotes {
|
|||||||
Doc* getDoc(const DocID& id);
|
Doc* getDoc(const DocID& id);
|
||||||
|
|
||||||
Doc* addDoc(const CRDTAgent& self_agent, const DocID& doc);
|
Doc* addDoc(const CRDTAgent& self_agent, const DocID& doc);
|
||||||
|
|
||||||
|
void writeLockRelease(const DocID& id);
|
||||||
|
bool isWriteLocked(const DocID& id) const;
|
||||||
|
std::optional<DocWriteLock> writeLockAquire(const DocID& id);
|
||||||
|
};
|
||||||
|
|
||||||
|
template<>
|
||||||
|
struct std::hash<CRDTNotes::DocWriteLock> {
|
||||||
|
std::uint64_t operator()(const CRDTNotes::DocWriteLock& s) const noexcept {
|
||||||
|
return std::hash<ID32>{}(s.id);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -5,10 +5,23 @@
|
|||||||
|
|
||||||
#include <solanaceae/contact/components.hpp>
|
#include <solanaceae/contact/components.hpp>
|
||||||
|
|
||||||
|
#include <entt/container/dense_set.hpp>
|
||||||
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
|
namespace Components {
|
||||||
|
|
||||||
|
// attached to contact
|
||||||
|
struct OpSendQueue {
|
||||||
|
std::map<CRDTNotes::DocID, std::vector<CRDTNotes::Doc::Op>> ops;
|
||||||
|
// HACK: limit to 5 ops per packet for now
|
||||||
|
// TODO: ft based alternative for >5 ops
|
||||||
|
};
|
||||||
|
|
||||||
|
} // Components
|
||||||
|
|
||||||
static ID32 id_from_vec(const std::vector<uint8_t>& vec) {
|
static ID32 id_from_vec(const std::vector<uint8_t>& vec) {
|
||||||
ID32 new_id;
|
ID32 new_id;
|
||||||
for (size_t i = 0; i < new_id.size() && i < vec.size(); i++) {
|
for (size_t i = 0; i < new_id.size() && i < vec.size(); i++) {
|
||||||
@@ -27,6 +40,25 @@ CRDTNotesSync::~CRDTNotesSync(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
float CRDTNotesSync::iterate(float time_delta) {
|
float CRDTNotesSync::iterate(float time_delta) {
|
||||||
|
for (auto doc_it = _docs_incoming_ops.begin(); doc_it != _docs_incoming_ops.end();) {
|
||||||
|
if (_notes.isWriteLocked(doc_it->first)) {
|
||||||
|
doc_it++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto lock_opt = _notes.writeLockAquire(doc_it->first);
|
||||||
|
assert(lock_opt);
|
||||||
|
|
||||||
|
auto* doc_ptr = getDoc(doc_it->first);
|
||||||
|
// TODO: record every applied op and throw event, so eg gui can react better
|
||||||
|
// , or better yet, edit events in string space (imgui can consume them)
|
||||||
|
doc_ptr->apply(doc_it->second);
|
||||||
|
|
||||||
|
std::cout << "CRDTNotesSync: applied " << doc_it->second.size() << " ops\n";
|
||||||
|
|
||||||
|
doc_it = _docs_incoming_ops.erase(doc_it);
|
||||||
|
}
|
||||||
|
|
||||||
if (!_gossip_queue.empty()) {
|
if (!_gossip_queue.empty()) {
|
||||||
// TODO: set is sorted by id, not by order added
|
// TODO: set is sorted by id, not by order added
|
||||||
// only one per iterate *should* be enough
|
// only one per iterate *should* be enough
|
||||||
@@ -89,7 +121,40 @@ float CRDTNotesSync::iterate(float time_delta) {
|
|||||||
_fetch_frontier_queue.erase(it);
|
_fetch_frontier_queue.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 2.f;
|
bool sending_ops {false};
|
||||||
|
{ // send ops in queue
|
||||||
|
std::vector<Contact4> empty_queue;
|
||||||
|
for (const auto& [c, op_comp, sync_model] : _cs.registry().view<Components::OpSendQueue, CRDTNotesContactSyncModelI*>().each()) {
|
||||||
|
// HACK: one pkg with up to 5 ops per tick per peer
|
||||||
|
//for (const auto& [doc_id, op_vec] : op_comp.ops) {
|
||||||
|
for (auto it = op_comp.ops.begin(); it != op_comp.ops.end();) {
|
||||||
|
if (it->second.empty()) {
|
||||||
|
it = op_comp.ops.erase(it);
|
||||||
|
continue;
|
||||||
|
} else if (it->second.size() <= 5) {
|
||||||
|
std::cout << "sending " << it->second.size() << " ops\n";
|
||||||
|
sync_model->SendOps(_cs.contactHandle(c), it->first, it->second);
|
||||||
|
it = op_comp.ops.erase(it);
|
||||||
|
//sending_ops = true;
|
||||||
|
} else {
|
||||||
|
std::vector<CRDTNotes::Doc::Op> tmp_ops {it->second.cbegin(), it->second.cbegin()+5};
|
||||||
|
assert(tmp_ops.size() == 5);
|
||||||
|
sync_model->SendOps(_cs.contactHandle(c), it->first, tmp_ops);
|
||||||
|
it->second.erase(it->second.cbegin(), it->second.cbegin()+5);
|
||||||
|
sending_ops = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
break; // single update only
|
||||||
|
}
|
||||||
|
|
||||||
|
if (op_comp.ops.empty()) {
|
||||||
|
empty_queue.push_back(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_cs.registry().remove<Components::OpSendQueue>(empty_queue.cbegin(), empty_queue.cend());
|
||||||
|
}
|
||||||
|
|
||||||
|
return sending_ops ? 0.05f : 2.f;
|
||||||
}
|
}
|
||||||
|
|
||||||
CRDTNotes::Doc* CRDTNotesSync::getDoc(const CRDTNotes::DocID& doc_id) {
|
CRDTNotes::Doc* CRDTNotesSync::getDoc(const CRDTNotes::DocID& doc_id) {
|
||||||
@@ -203,7 +268,50 @@ void CRDTNotesSync::merge(const CRDTNotes::DocID& doc_id, std::string_view new_t
|
|||||||
auto op_vec = doc_ptr->merge(new_text);
|
auto op_vec = doc_ptr->merge(new_text);
|
||||||
std::cout << "doc changed " << op_vec.size() << " ops generated\n";
|
std::cout << "doc changed " << op_vec.size() << " ops generated\n";
|
||||||
|
|
||||||
// USE OPS
|
// attach OpSendQueue to every contact
|
||||||
|
// needs to be placed at the contact with the sync model
|
||||||
|
entt::dense_set<Contact4> handled_contacts;
|
||||||
|
for (auto c : _docs_contacts.at(doc_id)) {
|
||||||
|
if (handled_contacts.contains(c)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!c.all_of<CRDTNotesContactSyncModelI*>()) {
|
||||||
|
|
||||||
|
// TODO: this is a fallback, remove
|
||||||
|
if (c.all_of<Contact::Components::ParentOf>()) {
|
||||||
|
for (const auto child : c.get<Contact::Components::ParentOf>().subs) {
|
||||||
|
if (handled_contacts.contains(child)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (c.registry()->all_of<Contact::Components::TagSelfStrong>(child)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!c.registry()->all_of<CRDTNotesContactSyncModelI*>(child)) {
|
||||||
|
std::cerr << "CRDTNotesSync error: fallback failed\n";
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& op_queue = c.registry()->get_or_emplace<Components::OpSendQueue>(child).ops[doc_id];
|
||||||
|
if (op_queue.empty()) {
|
||||||
|
op_queue = op_vec;
|
||||||
|
} else {
|
||||||
|
op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend());
|
||||||
|
}
|
||||||
|
handled_contacts.emplace(child);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue; // skip, not impl
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& op_queue = c.get_or_emplace<Components::OpSendQueue>().ops[doc_id];
|
||||||
|
if (op_queue.empty()) {
|
||||||
|
op_queue = op_vec;
|
||||||
|
} else {
|
||||||
|
op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend());
|
||||||
|
}
|
||||||
|
handled_contacts.emplace(c);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) {
|
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) {
|
||||||
@@ -220,5 +328,31 @@ void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) {
|
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) {
|
||||||
|
addDoc(e.doc_id, e.c);
|
||||||
|
|
||||||
|
if (e.ops.empty()) {
|
||||||
|
std::cerr << "CRDTNotesSync warning: got empty ops event/pkg\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: deduplicate ops ?
|
||||||
|
auto lock_opt = _notes.writeLockAquire(e.doc_id);
|
||||||
|
if (lock_opt) {
|
||||||
|
// TODO: perms n stuff
|
||||||
|
// TODO: check if seq missing
|
||||||
|
auto* doc_ptr = getDoc(e.doc_id);
|
||||||
|
// TODO: record every applied op and throw event, so eg gui can react better
|
||||||
|
// , or better yet, edit events in string space (imgui can consume them)
|
||||||
|
doc_ptr->apply(e.ops);
|
||||||
|
|
||||||
|
// TODO: check if new frontier
|
||||||
|
} else {
|
||||||
|
auto& op_in_vec = _docs_incoming_ops[e.doc_id];
|
||||||
|
if (op_in_vec.empty()) {
|
||||||
|
op_in_vec = e.ops;
|
||||||
|
} else {
|
||||||
|
op_in_vec.insert(op_in_vec.cend(), e.ops.cbegin(), e.ops.cend());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -98,6 +98,9 @@ class CRDTNotesSync final : public CRDTNotesEventI {
|
|||||||
};
|
};
|
||||||
std::unordered_map<CRDTNotes::DocID, std::map<ContactHandle4, Peer>> _docs_peers;
|
std::unordered_map<CRDTNotes::DocID, std::map<ContactHandle4, Peer>> _docs_peers;
|
||||||
|
|
||||||
|
// queue of unapplied ops, kept here until write lock can be aquired
|
||||||
|
std::unordered_map<CRDTNotes::DocID, std::vector<CRDTNotes::Doc::Op>> _docs_incoming_ops;
|
||||||
|
|
||||||
// if a doc is eg new, it is added here
|
// if a doc is eg new, it is added here
|
||||||
std::set<CRDTNotes::DocID> _gossip_queue; // TODO: no
|
std::set<CRDTNotes::DocID> _gossip_queue; // TODO: no
|
||||||
std::set<CRDTNotes::DocID> _fetch_frontier_queue;
|
std::set<CRDTNotes::DocID> _fetch_frontier_queue;
|
||||||
|
@@ -9,7 +9,6 @@
|
|||||||
#include <imgui.h>
|
#include <imgui.h>
|
||||||
#include <misc/cpp/imgui_stdlib.h>
|
#include <misc/cpp/imgui_stdlib.h>
|
||||||
|
|
||||||
#include <iostream>
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
|
||||||
namespace detail {
|
namespace detail {
|
||||||
@@ -48,6 +47,12 @@ namespace detail {
|
|||||||
} // detail
|
} // detail
|
||||||
|
|
||||||
|
|
||||||
|
std::unordered_set<CRDTNotes::DocWriteLock>::iterator CRDTNotesImGui::findLock(const CRDTNotes::DocID& doc_id) {
|
||||||
|
auto it = _held_locks.begin();
|
||||||
|
for (; it != _held_locks.end() && it->id != doc_id; it++) {}
|
||||||
|
return it;
|
||||||
|
}
|
||||||
|
|
||||||
CRDTNotesImGui::CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs) : _notes(notes), _notes_sync(notes_sync), _cs(cs) {
|
CRDTNotesImGui::CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs) : _notes(notes), _notes_sync(notes_sync), _cs(cs) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,17 +147,36 @@ bool CRDTNotesImGui::renderDoc(const CRDTNotes::DocID& doc_id) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto lock_it = findLock(doc_id);
|
||||||
|
bool self_held = lock_it != _held_locks.end();
|
||||||
|
const bool foreign_held = !self_held && _notes.isWriteLocked(doc_id);
|
||||||
|
|
||||||
auto text = doc->getText();
|
auto text = doc->getText();
|
||||||
if (renderDocText(text)) {
|
ImGui::InputTextMultiline(
|
||||||
|
"##doc",
|
||||||
|
&text,
|
||||||
|
{-1,-1},
|
||||||
|
ImGuiInputTextFlags_AllowTabInput |
|
||||||
|
(foreign_held ? ImGuiInputTextFlags_ReadOnly : ImGuiInputTextFlags_None) |
|
||||||
|
ImGuiInputTextFlags_CallbackAlways
|
||||||
|
//cb,
|
||||||
|
//&text
|
||||||
|
);
|
||||||
|
if (!foreign_held && !self_held && (ImGui::IsItemActive() || ImGui::IsItemEdited())) {
|
||||||
|
// TODO: check
|
||||||
|
_held_locks.emplace(_notes.writeLockAquire(doc_id).value());
|
||||||
|
self_held = true;
|
||||||
|
//std::cout << "!!!! imgui lock aquired\n";
|
||||||
|
} else if (!foreign_held && self_held && !(ImGui::IsItemActive() || ImGui::IsItemEdited())) {
|
||||||
|
// release lock
|
||||||
|
_held_locks.erase(lock_it);
|
||||||
|
//std::cout << "!!!! imgui lock released\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (self_held && ImGui::IsItemEdited()) {
|
||||||
_notes_sync.merge(doc_id, text);
|
_notes_sync.merge(doc_id, text);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CRDTNotesImGui::renderDocText(std::string& text) const {
|
|
||||||
// TODO: replace with text editor (zep) or visualize stuff??
|
|
||||||
return ImGui::InputTextMultiline("##doc", &text, {-1,-1}, ImGuiInputTextFlags_AllowTabInput);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
@@ -4,6 +4,7 @@
|
|||||||
#include <solanaceae/contact/fwd.hpp>
|
#include <solanaceae/contact/fwd.hpp>
|
||||||
|
|
||||||
#include <set>
|
#include <set>
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
class CRDTNotesImGui {
|
class CRDTNotesImGui {
|
||||||
CRDTNotes& _notes;
|
CRDTNotes& _notes;
|
||||||
@@ -13,6 +14,9 @@ class CRDTNotesImGui {
|
|||||||
bool _show_global_list {true};
|
bool _show_global_list {true};
|
||||||
|
|
||||||
std::set<CRDTNotes::DocID> _open_docs;
|
std::set<CRDTNotes::DocID> _open_docs;
|
||||||
|
std::unordered_set<CRDTNotes::DocWriteLock> _held_locks;
|
||||||
|
|
||||||
|
std::unordered_set<CRDTNotes::DocWriteLock>::iterator findLock(const CRDTNotes::DocID& doc_id);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs);
|
CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs);
|
||||||
@@ -22,6 +26,5 @@ class CRDTNotesImGui {
|
|||||||
bool renderContactListContactSmall(const Contact4 c, const bool selected) const;
|
bool renderContactListContactSmall(const Contact4 c, const bool selected) const;
|
||||||
|
|
||||||
bool renderDoc(const CRDTNotes::DocID& doc_id);
|
bool renderDoc(const CRDTNotes::DocID& doc_id);
|
||||||
bool renderDocText(std::string& text) const;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user