From 05fd0940eacd24c556ff493219cb3f2fdda6aa3d Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 20 Dec 2022 19:15:32 +0100 Subject: [PATCH] forgot to clear buffer --- vim_research/test2.cpp | 74 +++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/vim_research/test2.cpp b/vim_research/test2.cpp index db43e4e..2c94bdc 100644 --- a/vim_research/test2.cpp +++ b/vim_research/test2.cpp @@ -515,39 +515,55 @@ void toxThread(SharedContext* ctx) { { // pump from buffer to staging const size_t max_commands = 1; size_t number_of_commands_done = 0; - std::lock_guard lg_staging{ctx->staging_mutex}; - for (auto& [agent, buffer] : ctx->buffer) { - if (agent == agent_local) { - // skip ? self - continue; - } - if (number_of_commands_done >= max_commands) { - break; - } - - // determain the seq we are looking for in buffer - uint64_t seq {0}; - if (ctx->staging_frontier.count(agent)) { - seq = ctx->staging_frontier.at(agent) + 1; - } - - if (!buffer.count(seq)) { // not in buffer, skip - continue; - } - - // this can lead to dead locks, if other code does this wrong - std::lock_guard lg{ctx->command_lists_mutex}; - for (; buffer.count(seq); seq++) { - ctx->command_lists[agent][seq] = buffer.at(seq); - - number_of_commands_done++; + std::vector empty_buffers; + { + std::lock_guard lg_staging{ctx->staging_mutex}; + for (auto& [agent, buffer] : ctx->buffer) { + if (agent == agent_local) { + // skip ? self + continue; + } if (number_of_commands_done >= max_commands) { break; } - } - ctx->staging_frontier[agent] = seq; - ctx->should_gossip_remote.emplace(agent); + // determain the seq we are looking for in buffer + uint64_t seq {0}; + if (ctx->staging_frontier.count(agent)) { + seq = ctx->staging_frontier.at(agent) + 1; + } + + if (!buffer.count(seq)) { // not in buffer, skip + continue; + } + + std::vector seq_to_remove; + { // this can lead to dead locks, if other code does this wrong + std::lock_guard lg{ctx->command_lists_mutex}; + for (; buffer.count(seq); seq++) { + ctx->command_lists[agent][seq] = buffer.at(seq); + seq_to_remove.push_back(seq); + + number_of_commands_done++; + if (number_of_commands_done >= max_commands) { + break; + } + } + + ctx->staging_frontier[agent] = seq; + } + ctx->should_gossip_remote.emplace(agent); + + for (const auto key : seq_to_remove) { + buffer.erase(key); + } + if (buffer.empty()) { + empty_buffers.push_back(agent); + } + } + } // scope for staging lock + for (const auto& agent : empty_buffers) { + ctx->buffer.erase(agent); } }