main thread com

This commit is contained in:
Green Sky 2022-12-19 17:23:45 +01:00
parent 2c10e258c0
commit e0938690c7
No known key found for this signature in database
1 changed files with 59 additions and 9 deletions

View File

@ -10,6 +10,7 @@ extern "C" {
#include <optional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <string_view>
#include <variant>
#include <thread>
@ -282,14 +283,21 @@ struct SharedContext {
ToxPubKey agent;
std::promise<void> agent_set;
// TODO: this is inefficent
std::mutex command_lists_mutex;
std::unordered_map<ToxPubKey, std::vector<Command>> command_lists;
// remote op queue for receive
// local op list for remote lookups
std::mutex command_lists_mutex; // for list and frontier!!
std::unordered_map<ToxPubKey, std::unordered_map<uint64_t, Command>> command_lists;
std::unordered_map<ToxPubKey, uint64_t> command_frontier; // last applied seq
// last seq for all known agents
// bool dirty
// contains remote changes we can apply in the main thread
std::mutex staging_mutex;
std::unordered_map<ToxPubKey, uint64_t> staging_frontier; // last seq we have in command_lists, via tox
// (can be lower then command_frontier for local agent
// contains remote changes with missing parent seq
// could merge into comamnd_lists
std::unordered_map<ToxPubKey, std::unordered_map<uint64_t, Command>> buffer;
std::atomic_bool should_gossip_local; // local changes (set by main thread, reset by tox thread)
std::unordered_set<ToxPubKey> should_gossip_remote; // list of ids we have new seq for (only modified by tox thread)
Tox* tox {nullptr};
bool tox_dht_online {false};
@ -627,9 +635,31 @@ int main(void) {
// apply changes (some) and gen vim inserts
std::cout << "got fetch changes\n";
auto j_res_line_list = nlohmann::json::array();
bool changes {false};
{ // apply changes
// TODO: make less locky, we dont need to hold both at the same time etc
//std::lock_guard lg {ctx.staging_mutex};
std::scoped_lock sl {ctx.staging_mutex, ctx.command_lists_mutex};
if (true) { // external changes
// TODO: limit number of ops for responsiveness
for (const auto& [agent, staging_seq] : ctx.staging_frontier) {
// check if remote newer
if (!ctx.command_frontier.count(agent) || ctx.command_frontier.at(agent) < staging_seq) {
for (uint64_t seq = ctx.command_frontier[agent]; seq <= staging_seq; seq++) {
// !! this can get expensive, while we are holding locks :/
bool r = doc.apply(ctx.command_lists.at(agent).at(seq).ops);
assert(r && "this should not happen");
}
ctx.command_frontier[agent] = staging_seq;
changes = true;
}
}
}
auto j_res_line_list = nlohmann::json::array();
if (changes) { // external changes
const auto crdt_text = doc.getText();
std::string_view text_view {crdt_text};
for (int64_t i = 1; ; i++) {
@ -692,6 +722,26 @@ int main(void) {
std::cout << "\n";
}
assert(doc.getText() == new_text);
// TODO: make something less locky
// TODO: make method
{
std::lock_guard mg{ctx.command_lists_mutex};
assert(ctx.command_lists.size() == ctx.command_frontier.size());
auto& local_command_list = ctx.command_lists[ctx.agent];
uint64_t seq {0};
if (ctx.command_frontier.count(ctx.agent)) { // get last own seq
seq = ctx.command_frontier[ctx.agent]++;
}
local_command_list.emplace(seq, Command{
ctx.agent,
seq,
ops
});
ctx.command_frontier[ctx.agent] = seq;
}
ctx.should_gossip_local.store(true);
} else {
std::cout << "unknown command '" << command << "'\n";
}