From cb0c2642f872874a74ea239930fea49795926267 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 20 Dec 2022 02:23:54 +0100 Subject: [PATCH] works, somewhat --- vim_research/test2.cpp | 105 ++++++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 28 deletions(-) diff --git a/vim_research/test2.cpp b/vim_research/test2.cpp index 7116016..35422bd 100644 --- a/vim_research/test2.cpp +++ b/vim_research/test2.cpp @@ -484,7 +484,44 @@ void toxThread(SharedContext* ctx) { std::cout << "tox connected to group\n"; } } else { // do the thing - // pump from buffer to staging + { // pump from buffer to staging + const size_t max_commands = 1; + size_t number_of_commands_done = 0; + std::lock_guard lg_staging{ctx->staging_mutex}; + for (auto& [agent, buffer] : ctx->buffer) { + if (agent == agent_local) { + // skip ? self + continue; + } + if (number_of_commands_done >= max_commands) { + break; + } + + // determain the seq we are looking for in buffer + uint64_t seq {0}; + if (ctx->staging_frontier.count(agent)) { + seq = ctx->staging_frontier.at(agent) + 1; + } + + if (!buffer.count(seq)) { // not in buffer, skip + continue; + } + + // this can lead to dead locks, if other code does this wrong + std::lock_guard lg{ctx->command_lists_mutex}; + for (; buffer.count(seq); seq++) { + ctx->command_lists[agent][seq] = buffer.at(seq); + + number_of_commands_done++; + if (number_of_commands_done >= max_commands) { + break; + } + } + + ctx->staging_frontier[agent] = seq; + ctx->should_gossip_remote.emplace(agent); + } + } // request missing in buffer @@ -782,10 +819,20 @@ int main(void) { for (const auto& [agent, staging_seq] : ctx.staging_frontier) { // check if remote newer if (!ctx.command_frontier.count(agent) || ctx.command_frontier.at(agent) < staging_seq) { - for (uint64_t seq = ctx.command_frontier[agent]; seq <= staging_seq; seq++) { + uint64_t seq {0}; + if (ctx.command_frontier.count(agent)) { + seq = ctx.command_frontier[agent] + 1; + } + for (; seq <= staging_seq; seq++) { // !! this can get expensive, while we are holding locks :/ bool r = doc.apply(ctx.command_lists.at(agent).at(seq).ops); + if (!r) { + std::cout << "faild to apply:\n"; + for (const auto& op : ctx.command_lists.at(agent).at(seq).ops) { + std::cout << " " << op << "\n"; + } + } assert(r && "this should not happen"); } @@ -844,10 +891,10 @@ int main(void) { //std::cout << "new_text:\n" << new_text << "\n"; //std::cout << "old_text:\n" << doc.getText() << "\n"; std::cout << "doc state: "; - for (const auto& e : doc.state.list) { - std::cout << e << " "; - } - std::cout << "\n"; + for (const auto& e : doc.state.list) { + std::cout << e << " "; + } + std::cout << "\n"; const auto ops = doc.merge(new_text); if (!ops.empty()) { @@ -860,32 +907,34 @@ int main(void) { } assert(doc.getText() == new_text); - // TODO: make something less locky - // TODO: make method - { - std::lock_guard mg{ctx.command_lists_mutex}; - assert(ctx.command_lists.size() == ctx.command_frontier.size()); + if (!ops.empty()) { + // TODO: make something less locky + // TODO: make method + { + std::lock_guard mg{ctx.command_lists_mutex}; + //assert(ctx.command_lists.size() == ctx.command_frontier.size()); - auto& local_command_list = ctx.command_lists[ctx.agent]; + auto& local_command_list = ctx.command_lists[ctx.agent]; - uint64_t seq {0}; - if (ctx.command_frontier.count(ctx.agent)) { // get last own seq - seq = ctx.command_frontier[ctx.agent] + 1; - } - const size_t max_ops {5}; // limit ops per command so we can fit them into packets - for (size_t i = 0; i < ops.size(); i+=max_ops, seq++) { - std::vector tmp_ops {ops.cbegin()+i, ops.cbegin()+i+1}; - assert(!tmp_ops.empty()); - - local_command_list.emplace(seq, Command{ - ctx.agent, - seq, - tmp_ops - }); - ctx.command_frontier[ctx.agent] = seq; + uint64_t seq {0}; + if (ctx.command_frontier.count(ctx.agent)) { // get last own seq + seq = ctx.command_frontier[ctx.agent] + 1; + } + const size_t max_ops {5}; // limit ops per command so we can fit them into packets + for (size_t i = 0; i < ops.size(); i+=max_ops, seq++) { + std::vector tmp_ops {ops.cbegin()+i, ops.cbegin()+i+1}; + assert(!tmp_ops.empty()); + + local_command_list.emplace(seq, Command{ + ctx.agent, + seq, + tmp_ops + }); + ctx.command_frontier[ctx.agent] = seq; + } } + ctx.should_gossip_local.store(true); } - ctx.should_gossip_local.store(true); } else { std::cout << "unknown command '" << command << "'\n"; }