Compare commits
	
		
			3 Commits
		
	
	
		
			396a03c229
			...
			31f5adfcc0
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 31f5adfcc0 | ||
|  | 95f3c821a5 | ||
|  | 9cab041af4 | 
| @@ -37,3 +37,20 @@ CRDTNotes::Doc* CRDTNotes::addDoc(const CRDTAgent& self_agent, const DocID& id) | ||||
| 	return &doc; | ||||
| } | ||||
|  | ||||
| void CRDTNotes::writeLockRelease(const DocID& id) { | ||||
| 	assert(_doc_write_locks.count(id) > 0); | ||||
| 	_doc_write_locks.erase(id); | ||||
| } | ||||
|  | ||||
| bool CRDTNotes::isWriteLocked(const DocID& id) const { | ||||
| 	return _doc_write_locks.count(id); | ||||
| } | ||||
|  | ||||
| std::optional<CRDTNotes::DocWriteLock> CRDTNotes::writeLockAquire(const DocID& id) { | ||||
| 	if (_doc_write_locks.count(id)) { | ||||
| 		return std::nullopt; // replace with exception instead? | ||||
| 	} | ||||
|  | ||||
| 	_doc_write_locks.emplace(id); | ||||
| 	return DocWriteLock{*this, id}; | ||||
| } | ||||
|   | ||||
| @@ -6,6 +6,8 @@ | ||||
| #include <cstdint> | ||||
| #include <functional> | ||||
| #include <unordered_map> | ||||
| #include <unordered_set> | ||||
| #include <optional> | ||||
|  | ||||
| using ID32 = std::array<uint8_t, 32>; | ||||
|  | ||||
| @@ -37,9 +39,23 @@ class CRDTNotes { | ||||
| 			uint64_t seq{0}; | ||||
| 		}; | ||||
|  | ||||
| 		// RAII lock wrapper | ||||
| 		struct DocWriteLock { | ||||
| 			CRDTNotes* notes; | ||||
| 			DocID id; | ||||
|  | ||||
| 			// ctr assumes lock | ||||
| 			DocWriteLock(CRDTNotes& notes, const DocID& id) : notes(¬es), id(id) {} | ||||
| 			DocWriteLock(const DocWriteLock&) = delete; | ||||
| 			DocWriteLock(DocWriteLock&& other) : notes(other.notes), id(other.id) { other.notes = nullptr; } | ||||
| 			~DocWriteLock(void) { if (notes) { notes->writeLockRelease(id); } } | ||||
| 			bool operator==(const DocWriteLock& other) const { return id == other.id; } | ||||
| 		}; | ||||
|  | ||||
| 	private: | ||||
| 		// TODO: add metadata to docs | ||||
| 		std::unordered_map<DocID, Doc> _docs; | ||||
| 		std::unordered_set<DocID> _doc_write_locks; | ||||
|  | ||||
| 	public: | ||||
| 		// config? | ||||
| @@ -52,5 +68,16 @@ class CRDTNotes { | ||||
| 		Doc* getDoc(const DocID& id); | ||||
|  | ||||
| 		Doc* addDoc(const CRDTAgent& self_agent, const DocID& doc); | ||||
|  | ||||
| 		void writeLockRelease(const DocID& id); | ||||
| 		bool isWriteLocked(const DocID& id) const; | ||||
| 		std::optional<DocWriteLock> writeLockAquire(const DocID& id); | ||||
| }; | ||||
|  | ||||
| template<> | ||||
| struct std::hash<CRDTNotes::DocWriteLock> { | ||||
| 	std::uint64_t operator()(const CRDTNotes::DocWriteLock& s) const noexcept { | ||||
| 		return std::hash<ID32>{}(s.id); | ||||
| 	} | ||||
| }; | ||||
|  | ||||
|   | ||||
| @@ -5,10 +5,23 @@ | ||||
|  | ||||
| #include <solanaceae/contact/components.hpp> | ||||
|  | ||||
| #include <entt/container/dense_set.hpp> | ||||
|  | ||||
| #include <cstdint> | ||||
| #include <vector> | ||||
| #include <iostream> | ||||
|  | ||||
| namespace Components { | ||||
|  | ||||
| // attached to contact | ||||
| struct OpSendQueue { | ||||
| 	std::map<CRDTNotes::DocID, std::vector<CRDTNotes::Doc::Op>> ops; | ||||
| 	// HACK: limit to 5 ops per packet for now | ||||
| 	// TODO: ft based alternative for >5 ops | ||||
| }; | ||||
|  | ||||
| } // Components | ||||
|  | ||||
| static ID32 id_from_vec(const std::vector<uint8_t>& vec) { | ||||
| 	ID32 new_id; | ||||
| 	for (size_t i = 0; i < new_id.size() && i < vec.size(); i++) { | ||||
| @@ -27,6 +40,25 @@ CRDTNotesSync::~CRDTNotesSync(void) { | ||||
| } | ||||
|  | ||||
| float CRDTNotesSync::iterate(float time_delta) { | ||||
| 	for (auto doc_it = _docs_incoming_ops.begin(); doc_it != _docs_incoming_ops.end();) { | ||||
| 		if (_notes.isWriteLocked(doc_it->first)) { | ||||
| 			doc_it++; | ||||
| 			continue; | ||||
| 		} | ||||
|  | ||||
| 		auto lock_opt = _notes.writeLockAquire(doc_it->first); | ||||
| 		assert(lock_opt); | ||||
|  | ||||
| 		auto* doc_ptr = getDoc(doc_it->first); | ||||
| 		// TODO: record every applied op and throw event, so eg gui can react better | ||||
| 		// , or better yet, edit events in string space (imgui can consume them) | ||||
| 		doc_ptr->apply(doc_it->second); | ||||
|  | ||||
| 		std::cout << "CRDTNotesSync: applied " << doc_it->second.size() << " ops\n"; | ||||
|  | ||||
| 		doc_it = _docs_incoming_ops.erase(doc_it); | ||||
| 	} | ||||
|  | ||||
| 	if (!_gossip_queue.empty()) { | ||||
| 		// TODO: set is sorted by id, not by order added | ||||
| 		// only one per iterate *should* be enough | ||||
| @@ -89,7 +121,40 @@ float CRDTNotesSync::iterate(float time_delta) { | ||||
| 		_fetch_frontier_queue.erase(it); | ||||
| 	} | ||||
|  | ||||
| 	return 2.f; | ||||
| 	bool sending_ops {false}; | ||||
| 	{ // send ops in queue | ||||
| 		std::vector<Contact4> empty_queue; | ||||
| 		for (const auto& [c, op_comp, sync_model] : _cs.registry().view<Components::OpSendQueue, CRDTNotesContactSyncModelI*>().each()) { | ||||
| 			// HACK: one pkg with up to 5 ops per tick per peer | ||||
| 			//for (const auto& [doc_id, op_vec] : op_comp.ops) { | ||||
| 			for (auto it = op_comp.ops.begin(); it != op_comp.ops.end();) { | ||||
| 				if (it->second.empty()) { | ||||
| 					it = op_comp.ops.erase(it); | ||||
| 					continue; | ||||
| 				} else if (it->second.size() <= 5) { | ||||
| 					std::cout << "sending " << it->second.size() << " ops\n"; | ||||
| 					sync_model->SendOps(_cs.contactHandle(c), it->first, it->second); | ||||
| 					it = op_comp.ops.erase(it); | ||||
| 					//sending_ops = true; | ||||
| 				} else { | ||||
| 					std::vector<CRDTNotes::Doc::Op> tmp_ops {it->second.cbegin(), it->second.cbegin()+5}; | ||||
| 					assert(tmp_ops.size() == 5); | ||||
| 					sync_model->SendOps(_cs.contactHandle(c), it->first, tmp_ops); | ||||
| 					it->second.erase(it->second.cbegin(), it->second.cbegin()+5); | ||||
| 					sending_ops = true; | ||||
| 				} | ||||
|  | ||||
| 				break; // single update only | ||||
| 			} | ||||
|  | ||||
| 			if (op_comp.ops.empty()) { | ||||
| 				empty_queue.push_back(c); | ||||
| 			} | ||||
| 		} | ||||
| 		_cs.registry().remove<Components::OpSendQueue>(empty_queue.cbegin(), empty_queue.cend()); | ||||
| 	} | ||||
|  | ||||
| 	return sending_ops ? 0.05f : 2.f; | ||||
| } | ||||
|  | ||||
| CRDTNotes::Doc* CRDTNotesSync::getDoc(const CRDTNotes::DocID& doc_id) { | ||||
| @@ -203,7 +268,50 @@ void CRDTNotesSync::merge(const CRDTNotes::DocID& doc_id, std::string_view new_t | ||||
| 	auto op_vec = doc_ptr->merge(new_text); | ||||
| 	std::cout << "doc changed " << op_vec.size() << " ops generated\n"; | ||||
|  | ||||
| 	// USE OPS | ||||
| 	// attach OpSendQueue to every contact | ||||
| 	// needs to be placed at the contact with the sync model | ||||
| 	entt::dense_set<Contact4> handled_contacts; | ||||
| 	for (auto c : _docs_contacts.at(doc_id)) { | ||||
| 		if (handled_contacts.contains(c)) { | ||||
| 			continue; | ||||
| 		} | ||||
|  | ||||
| 		if (!c.all_of<CRDTNotesContactSyncModelI*>()) { | ||||
|  | ||||
| 			// TODO: this is a fallback, remove | ||||
| 			if (c.all_of<Contact::Components::ParentOf>()) { | ||||
| 				for (const auto child : c.get<Contact::Components::ParentOf>().subs) { | ||||
| 					if (handled_contacts.contains(child)) { | ||||
| 						continue; | ||||
| 					} | ||||
| 					if (c.registry()->all_of<Contact::Components::TagSelfStrong>(child)) { | ||||
| 						continue; | ||||
| 					} | ||||
| 					if (!c.registry()->all_of<CRDTNotesContactSyncModelI*>(child)) { | ||||
| 						std::cerr << "CRDTNotesSync error: fallback failed\n"; | ||||
| 						continue; | ||||
| 					} | ||||
|  | ||||
| 					auto& op_queue = c.registry()->get_or_emplace<Components::OpSendQueue>(child).ops[doc_id]; | ||||
| 					if (op_queue.empty()) { | ||||
| 						op_queue = op_vec; | ||||
| 					} else { | ||||
| 						op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend()); | ||||
| 					} | ||||
| 					handled_contacts.emplace(child); | ||||
| 				} | ||||
| 			} | ||||
| 			continue; // skip, not impl | ||||
| 		} | ||||
|  | ||||
| 		auto& op_queue = c.get_or_emplace<Components::OpSendQueue>().ops[doc_id]; | ||||
| 		if (op_queue.empty()) { | ||||
| 			op_queue = op_vec; | ||||
| 		} else { | ||||
| 			op_queue.insert(op_queue.cend(), op_vec.cbegin(), op_vec.cend()); | ||||
| 		} | ||||
| 		handled_contacts.emplace(c); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) { | ||||
| @@ -220,5 +328,31 @@ void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) { | ||||
| } | ||||
|  | ||||
| void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) { | ||||
| 	addDoc(e.doc_id, e.c); | ||||
|  | ||||
| 	if (e.ops.empty()) { | ||||
| 		std::cerr << "CRDTNotesSync warning: got empty ops event/pkg\n"; | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	// TODO: deduplicate ops ? | ||||
| 	auto lock_opt = _notes.writeLockAquire(e.doc_id); | ||||
| 	if (lock_opt) { | ||||
| 		// TODO: perms n stuff | ||||
| 		// TODO: check if seq missing | ||||
| 		auto* doc_ptr = getDoc(e.doc_id); | ||||
| 		// TODO: record every applied op and throw event, so eg gui can react better | ||||
| 		// , or better yet, edit events in string space (imgui can consume them) | ||||
| 		doc_ptr->apply(e.ops); | ||||
|  | ||||
| 		// TODO: check if new frontier | ||||
| 	} else { | ||||
| 		auto& op_in_vec = _docs_incoming_ops[e.doc_id]; | ||||
| 		if (op_in_vec.empty()) { | ||||
| 			op_in_vec = e.ops; | ||||
| 		} else { | ||||
| 			op_in_vec.insert(op_in_vec.cend(), e.ops.cbegin(), e.ops.cend()); | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -98,6 +98,9 @@ class CRDTNotesSync final : public CRDTNotesEventI { | ||||
| 	}; | ||||
| 	std::unordered_map<CRDTNotes::DocID, std::map<ContactHandle4, Peer>> _docs_peers; | ||||
|  | ||||
| 	// queue of unapplied ops, kept here until write lock can be aquired | ||||
| 	std::unordered_map<CRDTNotes::DocID, std::vector<CRDTNotes::Doc::Op>> _docs_incoming_ops; | ||||
|  | ||||
| 	// if a doc is eg new, it is added here | ||||
| 	std::set<CRDTNotes::DocID> _gossip_queue; // TODO: no | ||||
| 	std::set<CRDTNotes::DocID> _fetch_frontier_queue; | ||||
|   | ||||
| @@ -9,7 +9,6 @@ | ||||
| #include <imgui.h> | ||||
| #include <misc/cpp/imgui_stdlib.h> | ||||
|  | ||||
| #include <iostream> | ||||
| #include <cassert> | ||||
|  | ||||
| namespace detail { | ||||
| @@ -48,6 +47,12 @@ namespace detail { | ||||
| } // detail | ||||
|  | ||||
|  | ||||
| std::unordered_set<CRDTNotes::DocWriteLock>::iterator CRDTNotesImGui::findLock(const CRDTNotes::DocID& doc_id) { | ||||
| 	auto it = _held_locks.begin(); | ||||
| 	for (; it != _held_locks.end() && it->id != doc_id; it++) {} | ||||
| 	return it; | ||||
| } | ||||
|  | ||||
| CRDTNotesImGui::CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs) : _notes(notes), _notes_sync(notes_sync), _cs(cs) { | ||||
| } | ||||
|  | ||||
| @@ -142,17 +147,36 @@ bool CRDTNotesImGui::renderDoc(const CRDTNotes::DocID& doc_id) { | ||||
| 		return false; | ||||
| 	} | ||||
|  | ||||
| 	auto lock_it = findLock(doc_id); | ||||
| 	bool self_held = lock_it != _held_locks.end(); | ||||
| 	const bool foreign_held = !self_held && _notes.isWriteLocked(doc_id); | ||||
|  | ||||
| 	auto text = doc->getText(); | ||||
| 	if (renderDocText(text)) { | ||||
| 	ImGui::InputTextMultiline( | ||||
| 		"##doc", | ||||
| 		&text, | ||||
| 		{-1,-1}, | ||||
| 			ImGuiInputTextFlags_AllowTabInput | | ||||
| 			(foreign_held ? ImGuiInputTextFlags_ReadOnly : ImGuiInputTextFlags_None) | | ||||
| 			ImGuiInputTextFlags_CallbackAlways | ||||
| 		//cb, | ||||
| 		//&text | ||||
| 	); | ||||
| 	if (!foreign_held && !self_held && (ImGui::IsItemActive() || ImGui::IsItemEdited())) { | ||||
| 		// TODO: check | ||||
| 		_held_locks.emplace(_notes.writeLockAquire(doc_id).value()); | ||||
| 		self_held = true; | ||||
| 		//std::cout << "!!!! imgui lock aquired\n"; | ||||
| 	} else if (!foreign_held && self_held && !(ImGui::IsItemActive() || ImGui::IsItemEdited())) { | ||||
| 		// release lock | ||||
| 		_held_locks.erase(lock_it); | ||||
| 		//std::cout << "!!!! imgui lock released\n"; | ||||
| 	} | ||||
|  | ||||
| 	if (self_held && ImGui::IsItemEdited()) { | ||||
| 		_notes_sync.merge(doc_id, text); | ||||
| 		return true; | ||||
| 	} | ||||
|  | ||||
| 	return false; | ||||
| } | ||||
|  | ||||
| bool CRDTNotesImGui::renderDocText(std::string& text) const { | ||||
| 	// TODO: replace with text editor (zep) or visualize stuff?? | ||||
| 	return ImGui::InputTextMultiline("##doc", &text, {-1,-1}, ImGuiInputTextFlags_AllowTabInput); | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -4,6 +4,7 @@ | ||||
| #include <solanaceae/contact/fwd.hpp> | ||||
|  | ||||
| #include <set> | ||||
| #include <unordered_set> | ||||
|  | ||||
| class CRDTNotesImGui { | ||||
| 	CRDTNotes& _notes; | ||||
| @@ -13,6 +14,9 @@ class CRDTNotesImGui { | ||||
| 	bool _show_global_list {true}; | ||||
|  | ||||
| 	std::set<CRDTNotes::DocID> _open_docs; | ||||
| 	std::unordered_set<CRDTNotes::DocWriteLock> _held_locks; | ||||
|  | ||||
| 	std::unordered_set<CRDTNotes::DocWriteLock>::iterator findLock(const CRDTNotes::DocID& doc_id); | ||||
|  | ||||
| 	public: | ||||
| 		CRDTNotesImGui(CRDTNotes& notes, CRDTNotesSync& notes_sync, ContactStore4I& cs); | ||||
| @@ -22,6 +26,5 @@ class CRDTNotesImGui { | ||||
| 		bool renderContactListContactSmall(const Contact4 c, const bool selected) const; | ||||
|  | ||||
| 		bool renderDoc(const CRDTNotes::DocID& doc_id); | ||||
| 		bool renderDocText(std::string& text) const; | ||||
| }; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user