Compare commits
	
		
			3 Commits
		
	
	
		
			396a03c229
			...
			31f5adfcc0
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 31f5adfcc0 | ||
|  | 95f3c821a5 | ||
|  | 9cab041af4 | 
| @@ -37,3 +37,20 @@ CRDTNotes::Doc* CRDTNotes::addDoc(const CRDTAgent& self_agent, const DocID& id) | |||||||
| 	return &doc; | 	return &doc; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | void CRDTNotes::writeLockRelease(const DocID& id) { | ||||||
|  | 	assert(_doc_write_locks.count(id) > 0); | ||||||
|  | 	_doc_write_locks.erase(id); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | bool CRDTNotes::isWriteLocked(const DocID& id) const { | ||||||
|  | 	return _doc_write_locks.count(id); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | std::optional<CRDTNotes::DocWriteLock> CRDTNotes::writeLockAquire(const DocID& id) { | ||||||
|  | 	if (_doc_write_locks.count(id)) { | ||||||
|  | 		return std::nullopt; // replace with exception instead? | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	_doc_write_locks.emplace(id); | ||||||
|  | 	return DocWriteLock{*this, id}; | ||||||
|  | } | ||||||
|   | |||||||
| @@ -6,6 +6,8 @@ | |||||||
| #include <cstdint> | #include <cstdint> | ||||||
| #include <functional> | #include <functional> | ||||||
| #include <unordered_map> | #include <unordered_map> | ||||||
|  | #include <unordered_set> | ||||||
|  | #include <optional> | ||||||
|  |  | ||||||
| using ID32 = std::array<uint8_t, 32>; | using ID32 = std::array<uint8_t, 32>; | ||||||
|  |  | ||||||
| @@ -37,9 +39,23 @@ class CRDTNotes { | |||||||
| 			uint64_t seq{0}; | 			uint64_t seq{0}; | ||||||
| 		}; | 		}; | ||||||
|  |  | ||||||
|  | 		// RAII lock wrapper | ||||||
|  | 		struct DocWriteLock { | ||||||
|  | 			CRDTNotes* notes; | ||||||
|  | 			DocID id; | ||||||
|  |  | ||||||
|  | 			// ctr assumes lock | ||||||
|  | 			DocWriteLock(CRDTNotes& notes, const DocID& id) : notes(¬es), id(id) {} | ||||||
|  | 			DocWriteLock(const DocWriteLock&) = delete; | ||||||
|  | 			DocWriteLock(DocWriteLock&& other) : notes(other.notes), id(other.id) { other.notes = nullptr; } | ||||||
|  | 			~DocWriteLock(void) { if (notes) { notes->writeLockRelease(id); } } | ||||||
|  | 			bool operator==(const DocWriteLock& other) const { return id == other.id; } | ||||||
|  | 		}; | ||||||
|  |  | ||||||
| 	private: | 	private: | ||||||
| 		// TODO: add metadata to docs | 		// TODO: add metadata to docs | ||||||
| 		std::unordered_map<DocID, Doc> _docs; | 		std::unordered_map<DocID, Doc> _docs; | ||||||
|  | 		std::unordered_set<DocID> _doc_write_locks; | ||||||
|  |  | ||||||
| 	public: | 	public: | ||||||
| 		// config? | 		// config? | ||||||
| @@ -52,5 +68,16 @@ class CRDTNotes { | |||||||
| 		Doc* getDoc(const DocID& id); | 		Doc* getDoc(const DocID& id); | ||||||
|  |  | ||||||
| 		Doc* addDoc(const CRDTAgent& self_agent, const DocID& doc); | 		Doc* addDoc(const CRDTAgent& self_agent, const DocID& doc); | ||||||
|  |  | ||||||
|  | 		void writeLockRelease(const DocID& id); | ||||||
|  | 		bool isWriteLocked(const DocID& id) const; | ||||||
|  | 		std::optional<DocWriteLock> writeLockAquire(const DocID& id); | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | template<> | ||||||
|  | struct std::hash<CRDTNotes::DocWriteLock> { | ||||||
|  | 	std::uint64_t operator()(const CRDTNotes::DocWriteLock& s) const noexcept { | ||||||
|  | 		return std::hash<ID32>{}(s.id); | ||||||
|  | 	} | ||||||
| }; | }; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -5,10 +5,23 @@ | |||||||
|  |  | ||||||
| #include <solanaceae/contact/components.hpp> | #include <solanaceae/contact/components.hpp> | ||||||
|  |  | ||||||
|  | #include <entt/container/dense_set.hpp> | ||||||
|  |  | ||||||
| #include <cstdint> | #include <cstdint> | ||||||
| #include <vector> | #include <vector> | ||||||
| #include <iostream> | #include <iostream> | ||||||
|  |  | ||||||
|  | namespace Components { | ||||||
|  |  | ||||||
|  | // attached to contact | ||||||
|  | struct OpSendQueue { | ||||||
|  | 	std::map<CRDTNotes::DocID, std::vector<CRDTNotes::Doc::Op>> ops; | ||||||
|  | 	// HACK: limit to 5 ops per packet for now | ||||||
|  | 	// TODO: ft based alternative for >5 ops | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | } // Components | ||||||
|  |  | ||||||
| static ID32 id_from_vec(const std::vector<uint8_t>& vec) { | static ID32 id_from_vec(const std::vector<uint8_t>& vec) { | ||||||
| 	ID32 new_id; | 	ID32 new_id; | ||||||
| 	for (size_t i = 0; i < new_id.size() && i < vec.size(); i++) { | 	for (size_t i = 0; i < new_id.size() && i < vec.size(); i++) { | ||||||
| @@ -27,6 +40,25 @@ CRDTNotesSync::~CRDTNotesSync(void) { | |||||||
| } | } | ||||||
|  |  | ||||||
| float CRDTNotesSync::iterate(float time_delta) { | float CRDTNotesSync::iterate(float time_delta) { | ||||||
|  | 	for (auto doc_it = _docs_incoming_ops.begin(); doc_it != _docs_incoming_ops.end();) { | ||||||
|  | 		if (_notes.isWriteLocked(doc_it->first)) { | ||||||
|  | 			doc_it++; | ||||||
|  | 			continue; | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		auto lock_opt = _notes.writeLockAquire(doc_it->first); | ||||||
|  | 		assert(lock_opt); | ||||||
|  |  | ||||||
|  | 		auto* doc_ptr = getDoc(doc_it->first); | ||||||
|  | 		// TODO: record every applied op and throw event, so eg gui can react better | ||||||
|  | 		// , or better yet, edit events in string space (imgui can consume them) | ||||||
|  | 		doc_ptr->apply(doc_it->second); | ||||||
|  |  | ||||||
|  | 		std::cout << "CRDTNotesSync: applied " << doc_it->second.size() << " ops\n"; | ||||||
|  |  | ||||||
|  | 		doc_it = _docs_incoming_ops.erase(doc_it); | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if (!_gossip_queue.empty()) { | 	if (!_gossip_queue.empty()) { | ||||||
| 		// TODO: set is sorted by id, not by order added | 		// TODO: set is sorted by id, not by order added | ||||||
| 		// only one per iterate *should* be enough | 		// only one per iterate *should* be enough | ||||||
| @@ -89,7 +121,40 @@ float CRDTNotesSync::iterate(float time_delta) { | |||||||
| 		_fetch_frontier_queue.erase(it); | 		_fetch_frontier_queue.erase(it); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return 2.f; | 	bool sending_ops {false}; | ||||||
|  | 	{ // send ops in queue | ||||||
|  | 		std::vector<Contact4> empty_queue; | ||||||
|  | 		for (const auto& [c, op_comp, sync_model] : _cs.registry().view<Components::OpSendQueue, CRDTNotesContactSyncModelI*>().each()) { | ||||||
|  | 			// HACK: one pkg with up to 5 ops per tick per peer | ||||||
|  | 			//for (const auto& [doc_id, op_vec] : op_comp.ops) { | ||||||
|  | 			for (auto it = op_comp.ops.begin(); it != op_comp.ops.end();) { | ||||||
|  | 				if (it->second.empty()) { | ||||||
|  | 					it = op_comp.ops.erase(it); | ||||||
|  | 					continue; | ||||||
|  | 				} else if (it->second.size() <= 5) { | ||||||
|  | 					std::cout << "sending " << it->second.size() << " ops\n"; | ||||||
|  | 					sync_model->SendOps(_cs.contactHandle(c), it->first, it->second); | ||||||
|  | 					it = op_comp.ops.erase(it); | ||||||
|  | 					//sending_ops = true; | ||||||
|  | 				} else { | ||||||
|  | 					std::vector<CRDTNotes::Doc::Op> tmp_ops {it->second.cbegin(), it->second.cbegin()+5}; | ||||||
|  | 					assert(tmp_ops.size() == 5); | ||||||
|  | 					sync_model->SendOps(_cs.contactHandle(c), it->first, tmp_ops); | ||||||
|  | 					it->second.erase(it->second.cbegin(), it->second.cbegin()+5); | ||||||
|  | 					sending_ops = true; | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				break; // single update only | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if (op_comp.ops.empty()) { | ||||||
|  | 				empty_queue.push_back(c); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		_cs.registry().remove<Components::OpSendQueue>(empty_queue.cbegin(), empty_queue.cend()); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return sending_ops ? 0.05f : 2.f; | ||||||
| } | } | ||||||
|  |  | ||||||
| CRDTNotes::Doc* CRDTNotesSync::getDoc(const CRDTNotes::DocID& doc_id) { | CRDTNotes::Doc* CRDTNotesSync::getDoc(const CRDTNotes::DocID& doc_id) { | ||||||
| @@ -203,7 +268,50 @@ void CRDTNotesSync::merge(const CRDTNotes::DocID& doc_id, std::string_view new_t | |||||||
| 	auto op_vec = doc_ptr->merge(new_text); | 	auto op_vec = doc_ptr->merge(new_text); | ||||||
| 	std::cout << "doc changed " << op_vec.size() << " ops generated\n"; | 	std::cout << "doc changed " << op_vec.size() << " ops generated\n"; | ||||||
|  |  | ||||||
| 	// USE OPS | 	// attach OpSendQueue to every contact | ||||||
|  | 	// needs to be placed at the contact with the sync model | ||||||
|  | 	entt::dense_set<Contact4> handled_contacts; | ||||||
|  | 	for (auto c : _docs_contacts.at(doc_id)) { | ||||||
|  | 		if (handled_contacts.contains(c)) { | ||||||
|  | 			continue; | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if (!c.all_of<CRDTNotesContactSyncModelI*>()) { | ||||||
|  |  | ||||||
|  | 			// TODO: this is a fallback, remove | ||||||
|  | 			if (c.all_of<Contact::Components::ParentOf>()) { | ||||||
|  | 				for (const auto child : c.get<Contact::Components::ParentOf>().subs) { | ||||||
|  | 					if (handled_contacts.contains(child)) { | ||||||
|  | 						continue; | ||||||
|  | 					} | ||||||
|  | 					if (c.registry()->all_of<Contact::Components::TagSelfStrong>(child)) { | ||||||
|  | 						continue; | ||||||
|  | 					} | ||||||
|  | 					if (!c.registry()->all_of<CRDTNotesContactSyncModelI*>(child)) { | ||||||
|  | 						std::cerr << "CRDTNotesSync error: fallback failed\n"; | ||||||
|  | 						continue; | ||||||
|  | 					} | ||||||
|  |  | ||||||
|  | 					auto& op_queue = c.registry()->get_or_emplace<Components::OpSendQueue>(child).ops[doc_id]; | ||||||
|  | 					if (op_queue.empty()) { | ||||||
|  | 						op_queue = op_vec; | ||||||
|  | 					} else { | ||||||
|  | 						op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend()); | ||||||
|  | 					} | ||||||
|  | 					handled_contacts.emplace(child); | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			continue; // skip, not impl | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		auto& op_queue = c.get_or_emplace<Components::OpSendQueue>().ops[doc_id]; | ||||||
|  | 		if (op_queue.empty()) { | ||||||
|  | 			op_queue = op_vec; | ||||||
|  | 		} else { | ||||||
|  | 			op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend()); | ||||||
|  | 		} | ||||||
|  | 		handled_contacts.emplace(c); | ||||||
|  | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) { | void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) { | ||||||
| @@ -220,5 +328,31 @@ void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) { | |||||||
| } | } | ||||||
|  |  | ||||||
| void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) { | void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) { | ||||||
|  | 	addDoc(e.doc_id, e.c); | ||||||
|  |  | ||||||
|  | 	if (e.ops.empty()) { | ||||||
|  | 		std::cerr << "CRDTNotesSync warning: got empty ops event/pkg\n"; | ||||||
|  | 		return; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// TODO: deduplicate ops ? | ||||||
|  | 	auto lock_opt = _notes.writeLockAquire(e.doc_id); | ||||||
|  | 	if (lock_opt) { | ||||||
|  | 		// TODO: perms n stuff | ||||||
|  | 		// TODO: check if seq missing | ||||||
|  | 		auto* doc_ptr = getDoc(e.doc_id); | ||||||
|  | 		// TODO: record every applied op and throw event, so eg gui can react better | ||||||
|  | 		// , or better yet, edit events in string space (imgui can consume them) | ||||||
|  | 		doc_ptr->apply(e.ops); | ||||||
|  |  | ||||||
|  | 		// TODO: check if new frontier | ||||||
|  | 	} else { | ||||||
|  | 		auto& op_in_vec = _docs_incoming_ops[e.doc_id]; | ||||||
|  | 		if (op_in_vec.empty()) { | ||||||
|  | 			op_in_vec = e.ops; | ||||||
|  | 		} else { | ||||||
|  | 			op_in_vec.insert(op_in_vec.cend(), e.ops.cbegin(), e.ops.cend()); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -98,6 +98,9 @@ class CRDTNotesSync final : public CRDTNotesEventI { | |||||||
| 	}; | 	}; | ||||||
| 	std::unordered_map<CRDTNotes::DocID, std::map<ContactHandle4, Peer>> _docs_peers; | 	std::unordered_map<CRDTNotes::DocID, std::map<ContactHandle4, Peer>> _docs_peers; | ||||||
|  |  | ||||||
|  | 	// queue of unapplied ops, kept here until write lock can be aquired | ||||||
|  | 	std::unordered_map<CRDTNotes::DocID, std::vector<CRDTNotes::Doc::Op>> _docs_incoming_ops; | ||||||
|  |  | ||||||
| 	// if a doc is eg new, it is added here | 	// if a doc is eg new, it is added here | ||||||
| 	std::set<CRDTNotes::DocID> _gossip_queue; // TODO: no | 	std::set<CRDTNotes::DocID> _gossip_queue; // TODO: no | ||||||
| 	std::set<CRDTNotes::DocID> _fetch_frontier_queue; | 	std::set<CRDTNotes::DocID> _fetch_frontier_queue; | ||||||
|   | |||||||
| @@ -9,7 +9,6 @@ | |||||||
| #include <imgui.h> | #include <imgui.h> | ||||||
| #include <misc/cpp/imgui_stdlib.h> | #include <misc/cpp/imgui_stdlib.h> | ||||||
|  |  | ||||||
| #include <iostream> |  | ||||||
| #include <cassert> | #include <cassert> | ||||||
|  |  | ||||||
| namespace detail { | namespace detail { | ||||||
| @@ -48,6 +47,12 @@ namespace detail { | |||||||
| } // detail | } // detail | ||||||
|  |  | ||||||
|  |  | ||||||
|  | std::unordered_set<CRDTNotes::DocWriteLock>::iterator CRDTNotesImGui::findLock(const CRDTNotes::DocID& doc_id) { | ||||||
|  | 	auto it = _held_locks.begin(); | ||||||
|  | 	for (; it != _held_locks.end() && it->id != doc_id; it++) {} | ||||||
|  | 	return it; | ||||||
|  | } | ||||||
|  |  | ||||||
| CRDTNotesImGui::CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs) : _notes(notes), _notes_sync(notes_sync), _cs(cs) { | CRDTNotesImGui::CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs) : _notes(notes), _notes_sync(notes_sync), _cs(cs) { | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -142,17 +147,36 @@ bool CRDTNotesImGui::renderDoc(const CRDTNotes::DocID& doc_id) { | |||||||
| 		return false; | 		return false; | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	auto lock_it = findLock(doc_id); | ||||||
|  | 	bool self_held = lock_it != _held_locks.end(); | ||||||
|  | 	const bool foreign_held = !self_held && _notes.isWriteLocked(doc_id); | ||||||
|  |  | ||||||
| 	auto text = doc->getText(); | 	auto text = doc->getText(); | ||||||
| 	if (renderDocText(text)) { | 	ImGui::InputTextMultiline( | ||||||
|  | 		"##doc", | ||||||
|  | 		&text, | ||||||
|  | 		{-1,-1}, | ||||||
|  | 			ImGuiInputTextFlags_AllowTabInput | | ||||||
|  | 			(foreign_held ? ImGuiInputTextFlags_ReadOnly : ImGuiInputTextFlags_None) | | ||||||
|  | 			ImGuiInputTextFlags_CallbackAlways | ||||||
|  | 		//cb, | ||||||
|  | 		//&text | ||||||
|  | 	); | ||||||
|  | 	if (!foreign_held && !self_held && (ImGui::IsItemActive() || ImGui::IsItemEdited())) { | ||||||
|  | 		// TODO: check | ||||||
|  | 		_held_locks.emplace(_notes.writeLockAquire(doc_id).value()); | ||||||
|  | 		self_held = true; | ||||||
|  | 		//std::cout << "!!!! imgui lock aquired\n"; | ||||||
|  | 	} else if (!foreign_held && self_held && !(ImGui::IsItemActive() || ImGui::IsItemEdited())) { | ||||||
|  | 		// release lock | ||||||
|  | 		_held_locks.erase(lock_it); | ||||||
|  | 		//std::cout << "!!!! imgui lock released\n"; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if (self_held && ImGui::IsItemEdited()) { | ||||||
| 		_notes_sync.merge(doc_id, text); | 		_notes_sync.merge(doc_id, text); | ||||||
| 		return true; | 		return true; | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return false; | 	return false; | ||||||
| } | } | ||||||
|  |  | ||||||
| bool CRDTNotesImGui::renderDocText(std::string& text) const { |  | ||||||
| 	// TODO: replace with text editor (zep) or visualize stuff?? |  | ||||||
| 	return ImGui::InputTextMultiline("##doc", &text, {-1,-1}, ImGuiInputTextFlags_AllowTabInput); |  | ||||||
| } |  | ||||||
|  |  | ||||||
|   | |||||||
| @@ -4,6 +4,7 @@ | |||||||
| #include <solanaceae/contact/fwd.hpp> | #include <solanaceae/contact/fwd.hpp> | ||||||
|  |  | ||||||
| #include <set> | #include <set> | ||||||
|  | #include <unordered_set> | ||||||
|  |  | ||||||
| class CRDTNotesImGui { | class CRDTNotesImGui { | ||||||
| 	CRDTNotes& _notes; | 	CRDTNotes& _notes; | ||||||
| @@ -13,6 +14,9 @@ class CRDTNotesImGui { | |||||||
| 	bool _show_global_list {true}; | 	bool _show_global_list {true}; | ||||||
|  |  | ||||||
| 	std::set<CRDTNotes::DocID> _open_docs; | 	std::set<CRDTNotes::DocID> _open_docs; | ||||||
|  | 	std::unordered_set<CRDTNotes::DocWriteLock> _held_locks; | ||||||
|  |  | ||||||
|  | 	std::unordered_set<CRDTNotes::DocWriteLock>::iterator findLock(const CRDTNotes::DocID& doc_id); | ||||||
|  |  | ||||||
| 	public: | 	public: | ||||||
| 		CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs); | 		CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs); | ||||||
| @@ -22,6 +26,5 @@ class CRDTNotesImGui { | |||||||
| 		bool renderContactListContactSmall(const Contact4 c, const bool selected) const; | 		bool renderContactListContactSmall(const Contact4 c, const bool selected) const; | ||||||
|  |  | ||||||
| 		bool renderDoc(const CRDTNotes::DocID& doc_id); | 		bool renderDoc(const CRDTNotes::DocID& doc_id); | ||||||
| 		bool renderDocText(std::string& text) const; |  | ||||||
| }; | }; | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user