diff --git a/vim_research/test2.cpp b/vim_research/test2.cpp index 7d32acd..3ecc896 100644 --- a/vim_research/test2.cpp +++ b/vim_research/test2.cpp @@ -1,3 +1,4 @@ +#include "toxcore/tox.h" #include #include @@ -61,7 +62,7 @@ using Doc = GreenCRDT::TextDocument; using ListType = Doc::ListType; struct Command { - Agent actor; + Agent agent; uint64_t seq {0}; // independed of the ops inside, theoretically //... std::vector ops; @@ -129,7 +130,7 @@ namespace std { } // namespace std NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Command, - actor, + agent, seq, ops ) @@ -296,8 +297,9 @@ struct SharedContext { // could merge into comamnd_lists std::unordered_map> buffer; - std::atomic_bool should_gossip_local; // local changes (set by main thread, reset by tox thread) + std::atomic_bool should_gossip_local{false}; // local changes (set by main thread, reset by tox thread) std::unordered_set should_gossip_remote; // list of ids we have new seq for (only modified by tox thread) + std::unordered_map heard_gossip; // seq frontiers we have heard about Tox* tox {nullptr}; bool tox_dht_online {false}; @@ -307,6 +309,51 @@ struct SharedContext { namespace tox { +namespace pkg { + + enum PKGID : uint8_t { + FRONTIER = 32, + REQUEST_FRONTIER, + + COMMAND, + REQUEST_COMMANDS, + }; + + // send the currently last known seq you have (excluding buffer) + struct Frontier { + Agent agent; + uint64_t seq{0}; + }; + + NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Frontier, + agent, + seq + ) + + // request the last known seq another peer has for agent + struct RequestFrontier { + Agent agent; + }; + + NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestFrontier, + agent + ) + + using Command = ::Command; + + // request every command for agent after seq (inclusive) + struct RequestCommands { + Agent agent; + uint64_t seq{0}; + }; + + NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommands, + agent, + seq + ) + +} // namespace pkg + static std::vector hex2bin(const std::string& str) { std::vector bin{}; bin.resize(str.size()/2, 0); @@ -390,6 +437,9 @@ void toxThread(SharedContext* ctx) { } } + // "thread local" + Agent agent_local; + //tox_group_self_get_public_key() //tox_group_send_custom_packet() //tox_group_send_custom_private_packet() @@ -416,6 +466,7 @@ void toxThread(SharedContext* ctx) { ctx->agent_set.set_value(); return; // fuck everything } + agent_local = ctx->agent; ctx->agent_set.set_value(); } } else if (!ctx->tox_group_online) { // then wait for group to connect @@ -424,9 +475,66 @@ void toxThread(SharedContext* ctx) { std::cout << "tox connected to group\n"; } } else { // do the thing - // staging? + // pump from buffer to staging + + // request missing in buffer + + // request frontier (implicit list of agents) + // only every couple of second, can get large + // OR get back random agent and do it often + // handle requests - // send tip (prio self) + + { // gossip frontier + // for mutex locking simplicity this is an either-or + if (ctx->should_gossip_local.exchange(false)) { + pkg::Frontier f_pkg{ + agent_local, + 0u + }; + + pkg::Command c_pkg{ + agent_local, + 0u, + {} + }; + + { // lock + std::lock_guard lg{ctx->command_lists_mutex}; + assert(ctx->command_frontier.count(agent_local)); + + f_pkg.seq = ctx->command_frontier.at(agent_local); + + c_pkg = ctx->command_lists[agent_local][f_pkg.seq]; + } + + { // gossip + std::vector data = nlohmann::json::to_msgpack(f_pkg); + // prepend pkgid + data.emplace(data.begin(), static_cast(pkg::PKGID::FRONTIER)); + + if (!tox_group_send_custom_packet(ctx->tox, 0, true, data.data(), data.size(), nullptr)) { + std::cerr << "failed to send gossip packet of local agent\n"; + // TODO: set should_gossip_local back to true? + } else { + std::cout << "sent gossip of local agent\n"; + } + } + { // command + std::vector data = nlohmann::json::to_msgpack(c_pkg); + // prepend pkgid + data.emplace(data.begin(), static_cast(pkg::PKGID::COMMAND)); + + if (!tox_group_send_custom_packet(ctx->tox, 0, true, data.data(), data.size(), nullptr)) { + std::cerr << "failed to send command packet of local agent\n"; + } else { + std::cout << "sent command of local agent\n"; + } + } + } else if (!ctx->should_gossip_remote.empty()) { + std::lock_guard lg{ctx->command_lists_mutex}; + } + } } std::this_thread::sleep_for(20ms); @@ -751,7 +859,7 @@ int main(void) { auto& local_command_list = ctx.command_lists[ctx.agent]; uint64_t seq {0}; if (ctx.command_frontier.count(ctx.agent)) { // get last own seq - seq = ctx.command_frontier[ctx.agent]++; + seq = ctx.command_frontier[ctx.agent] + 1; } local_command_list.emplace(seq, Command{ ctx.agent, @@ -789,14 +897,66 @@ static void self_connection_status_cb(Tox*, TOX_CONNECTION connection_status, vo std::cout << "self_connection_status_cb " << connection_status << "\n"; } +static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length) { + if (length < 2) { + std::cerr << "got too short pkg " << length << "\n"; + return; + } + + pkg::PKGID pkg_id = static_cast(data[0]); + const auto p_j = nlohmann::json::from_msgpack(data+1, data+1 + (length-1), true, false); + if (p_j.is_discarded()) { + std::cerr << "got invalid msgpack for " << pkg_id << "\n"; + return; + } + + std::cout << "pkg " << pkg_id << " j:" << p_j.dump() << "\n"; + + switch (pkg_id) { + case pkg::PKGID::FRONTIER: { + pkg::Frontier pkg = p_j; + + if (!ctx.heard_gossip.count(pkg.agent) || ctx.heard_gossip[pkg.agent] < pkg.seq) { + ctx.heard_gossip[pkg.agent] = pkg.seq; + std::cout << "new seq " << pkg.seq << " from " << pkg.agent << "\n"; + } + break; + } + case pkg::PKGID::REQUEST_FRONTIER: { + pkg::RequestFrontier pkg = p_j; + break; + } + case pkg::PKGID::COMMAND: { + pkg::Command pkg = p_j; + + // push to buffer, if not in buffer + if (!ctx.buffer[pkg.agent].count(pkg.seq)) { + ctx.buffer[pkg.agent].emplace(pkg.seq, pkg); + std::cout << "pushed to buffer " << pkg.seq << " from " << pkg.agent << "\n"; + } + // TODO: notify something? + break; + } + case pkg::PKGID::REQUEST_COMMANDS: { + pkg::RequestCommands pkg = p_j; + break; + } + default: + std::cerr << "unknown pkg id " << pkg_id << "\n"; + break; + } +} + static void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) { std::cout << "group_custom_packet_cb\n"; SharedContext& ctx = *static_cast(user_data); + handle_pkg(ctx, data, length); } static void group_custom_private_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) { std::cout << "group_custom_private_packet_cb\n"; SharedContext& ctx = *static_cast(user_data); + handle_pkg(ctx, data, length); } }