From e0938690c7314ebc195e2a38f597d1c8da95f908 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Mon, 19 Dec 2022 17:23:45 +0100 Subject: [PATCH] main thread com --- vim_research/test2.cpp | 68 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/vim_research/test2.cpp b/vim_research/test2.cpp index b74b33a..bda49be 100644 --- a/vim_research/test2.cpp +++ b/vim_research/test2.cpp @@ -10,6 +10,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -282,14 +283,21 @@ struct SharedContext { ToxPubKey agent; std::promise agent_set; - // TODO: this is inefficent - std::mutex command_lists_mutex; - std::unordered_map> command_lists; - // remote op queue for receive - // local op list for remote lookups + std::mutex command_lists_mutex; // for list and frontier!! + std::unordered_map> command_lists; + std::unordered_map command_frontier; // last applied seq - // last seq for all known agents - // bool dirty + // contains remote changes we can apply in the main thread + std::mutex staging_mutex; + std::unordered_map staging_frontier; // last seq we have in command_lists, via tox + // (can be lower then command_frontier for local agent + + // contains remote changes with missing parent seq + // could merge into comamnd_lists + std::unordered_map> buffer; + + std::atomic_bool should_gossip_local; // local changes (set by main thread, reset by tox thread) + std::unordered_set should_gossip_remote; // list of ids we have new seq for (only modified by tox thread) Tox* tox {nullptr}; bool tox_dht_online {false}; @@ -627,9 +635,31 @@ int main(void) { // apply changes (some) and gen vim inserts std::cout << "got fetch changes\n"; - auto j_res_line_list = nlohmann::json::array(); + bool changes {false}; + { // apply changes + // TODO: make less locky, we dont need to hold both at the same time etc + //std::lock_guard lg {ctx.staging_mutex}; + std::scoped_lock sl {ctx.staging_mutex, ctx.command_lists_mutex}; - if (true) { // external changes + // TODO: limit number of ops for responsiveness + for (const auto& [agent, staging_seq] : ctx.staging_frontier) { + // check if remote newer + if (!ctx.command_frontier.count(agent) || ctx.command_frontier.at(agent) < staging_seq) { + for (uint64_t seq = ctx.command_frontier[agent]; seq <= staging_seq; seq++) { + + // !! this can get expensive, while we are holding locks :/ + bool r = doc.apply(ctx.command_lists.at(agent).at(seq).ops); + assert(r && "this should not happen"); + } + + ctx.command_frontier[agent] = staging_seq; + changes = true; + } + } + } + + auto j_res_line_list = nlohmann::json::array(); + if (changes) { // external changes const auto crdt_text = doc.getText(); std::string_view text_view {crdt_text}; for (int64_t i = 1; ; i++) { @@ -692,6 +722,26 @@ int main(void) { std::cout << "\n"; } assert(doc.getText() == new_text); + + // TODO: make something less locky + // TODO: make method + { + std::lock_guard mg{ctx.command_lists_mutex}; + assert(ctx.command_lists.size() == ctx.command_frontier.size()); + + auto& local_command_list = ctx.command_lists[ctx.agent]; + uint64_t seq {0}; + if (ctx.command_frontier.count(ctx.agent)) { // get last own seq + seq = ctx.command_frontier[ctx.agent]++; + } + local_command_list.emplace(seq, Command{ + ctx.agent, + seq, + ops + }); + ctx.command_frontier[ctx.agent] = seq; + } + ctx.should_gossip_local.store(true); } else { std::cout << "unknown command '" << command << "'\n"; }