diff --git a/vim_research/test2.cpp b/vim_research/test2.cpp index 195a9b6..7d96e1b 100644 --- a/vim_research/test2.cpp +++ b/vim_research/test2.cpp @@ -18,6 +18,8 @@ extern "C" { #include #include #include +#include +#include #include #include @@ -286,15 +288,13 @@ echo 'setup done' } // namespace vim // visibility hack -struct RequestCommands { +struct RequestCommand { Agent agent; - uint64_t after_seq{0}; - uint64_t until_seq{0}; + uint64_t seq{0}; }; -NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommands, +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommand, agent, - after_seq, - until_seq + seq ) // hash for unordered_set @@ -321,6 +321,10 @@ struct SharedContext { std::unordered_map staging_frontier; // last seq we have in command_lists, via tox // (can be lower then command_frontier for local agent + // TODO + std::mutex unknown_agents_mutex; + std::unordered_set unknown_agents; // list of agents we read about but dont have in command/saging frontier + // contains remote changes with missing parent seq // could merge into comamnd_lists std::unordered_map> buffer; @@ -332,13 +336,14 @@ struct SharedContext { // peer ids that requested the last known seq for agent std::unordered_set> requested_frontier; - // peer ids that requested a command (range) - std::vector> requested_commands; + // peer ids that requested a command + std::unordered_map requested_commands; Tox* tox {nullptr}; bool tox_dht_online {false}; bool tox_group_online {false}; uint32_t tox_group_number {-1u}; + std::unordered_set tox_seen_peers; }; namespace tox { @@ -348,9 +353,10 @@ namespace pkg { enum PKGID : uint8_t { FRONTIER = 32, REQUEST_FRONTIER, + REQUEST_FRONTIERS, COMMAND, - REQUEST_COMMANDS, + REQUEST_COMMAND, }; // send the currently last known seq you have (excluding buffer) @@ -381,12 +387,11 @@ namespace pkg { //uint64_t after_seq{0}; //uint64_t until_seq{0}; //}; - using RequestCommands = ::RequestCommands; + using RequestCommand = ::RequestCommand; - //NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommands, + //NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommand, //agent, - //after_seq, - //until_seq + //seq //) } // namespace pkg @@ -420,6 +425,8 @@ static void group_custom_private_packet_cb(Tox* tox, uint32_t group_number, uint void toxThread(SharedContext* ctx) { using namespace std::chrono_literals; + std::minstd_rand rng{1337}; + TOX_ERR_OPTIONS_NEW err_opt_new; Tox_Options* options = tox_options_new(&err_opt_new); assert(err_opt_new == TOX_ERR_OPTIONS_NEW::TOX_ERR_OPTIONS_NEW_OK); @@ -458,8 +465,8 @@ void toxThread(SharedContext* ctx) { }; DHT_node nodes[] { - {"tox.plastiras.org", 33445, "8E8B63299B3D520FB377FE5100E65E3322F7AE5B20A0ACED2981769FC5B43725", {}}, // 14 - {"tox2.plastiras.org", 33445, "B6626D386BE7E3ACA107B46F48A5C4D522D29281750D44A0CBA6A2721E79C951", {}}, // 14 + {"tox.plastiras.org", 33445, "8E8B63299B3D520FB377FE5100E65E3322F7AE5B20A0ACED2981769FC5B43725", {}}, // 14 + {"tox2.plastiras.org", 33445, "B6626D386BE7E3ACA107B46F48A5C4D522D29281750D44A0CBA6A2721E79C951", {}}, // 14 }; for (size_t i = 0; i < sizeof(nodes)/sizeof(DHT_node); i ++) { @@ -477,10 +484,6 @@ void toxThread(SharedContext* ctx) { // "thread local" Agent agent_local; - //tox_group_self_get_public_key() - //tox_group_send_custom_packet() - //tox_group_send_custom_private_packet() - while (!ctx->should_quit) { // tox iterate tox_iterate(ctx->tox, ctx); @@ -512,13 +515,18 @@ void toxThread(SharedContext* ctx) { std::cout << "tox connected to group\n"; } } else { // do the thing + std::vector> missing_in_buffer; { // pump from buffer to staging - const size_t max_commands = 1; + const size_t max_commands = 2; size_t number_of_commands_done = 0; std::vector empty_buffers; { std::lock_guard lg_staging{ctx->staging_mutex}; for (auto& [agent, buffer] : ctx->buffer) { + if (buffer.empty()) { + empty_buffers.push_back(agent); + continue; + } if (agent == agent_local) { // skip ? self continue; @@ -534,7 +542,15 @@ void toxThread(SharedContext* ctx) { } if (!buffer.count(seq)) { // not in buffer, skip - std::cout << "!!! buffer not empty but not next seq\n"; + // check if old in buffer + for (const auto& it : buffer) { + if (it.first < seq) { + assert(false && "buffer not clean !!"); + } + } + + //std::cout << "!!! buffer not empty but not next seq\n"; + missing_in_buffer.push_back(std::make_pair(agent, seq)); continue; } @@ -543,6 +559,7 @@ void toxThread(SharedContext* ctx) { std::lock_guard lg{ctx->command_lists_mutex}; for (; buffer.count(seq); seq++) { ctx->command_lists[agent][seq] = buffer.at(seq); + ctx->staging_frontier[agent] = seq; seq_to_remove.push_back(seq); number_of_commands_done++; @@ -551,7 +568,7 @@ void toxThread(SharedContext* ctx) { } } - ctx->staging_frontier[agent] = seq; + //ctx->staging_frontier[agent] = seq; } ctx->should_gossip_remote.emplace(agent); @@ -568,13 +585,70 @@ void toxThread(SharedContext* ctx) { } } - // request missing in buffer + { // request missing in buffer + // (every tick lol) + for (const auto& [agent, seq] : missing_in_buffer) { + // ask random peer_id we have seen before + const uint32_t peer_id = *ctx->tox_seen_peers.cbegin() + (rng() % ctx->tox_seen_peers.size()); + const auto status = tox_group_peer_get_connection_status(ctx->tox, ctx->tox_group_number, peer_id, nullptr); + if (status == TOX_CONNECTION_NONE) { + // bad luck, skip + // TODO: do seen peers cleanup + continue; + } + + // send request for command + pkg::RequestCommand rc_pkg{ + agent, seq + }; + std::vector data = nlohmann::json::to_msgpack(rc_pkg); + // prepend pkgid + data.emplace(data.begin(), static_cast(pkg::PKGID::REQUEST_COMMAND)); + + Tox_Err_Group_Send_Custom_Private_Packet send_err{TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK}; + if (!tox_group_send_custom_private_packet(ctx->tox, ctx->tox_group_number, peer_id, true, data.data(), data.size(), &send_err)) { + std::cerr << "failed to send command request packet for " << std::dec << seq << " from " << std::hex << (int)agent[0] << " " << std::dec << (int)send_err << "\n"; + assert(send_err != TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_TOO_LONG); + } else { + std::cout << "sent command request packet for " << std::dec << seq << " from " << std::hex << (int)agent[0] << " to " << std::dec << peer_id << "\n"; + } + } + } // request frontier (implicit list of agents) // only every couple of second, can get large // OR get back random agent and do it often - // handle requests + { // handle requests + // TODO: this lock is trash + if (!ctx->requested_commands.empty()) { + std::lock_guard lg{ctx->command_lists_mutex}; + for (const auto& [peer_id, request] : ctx->requested_commands) { + if (ctx->command_lists.count(request.agent) && ctx->command_lists.at(request.agent).count(request.seq)) { + const auto& command = ctx->command_lists.at(request.agent).at(request.seq); + + // send command + + std::vector data = nlohmann::json::to_msgpack(command); + // prepend pkgid + data.emplace(data.begin(), static_cast(pkg::PKGID::COMMAND)); + + Tox_Err_Group_Send_Custom_Private_Packet send_err{TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK}; + if (!tox_group_send_custom_private_packet(ctx->tox, ctx->tox_group_number, peer_id, true, data.data(), data.size(), &send_err)) { + std::cerr << "failed to send command packet " << send_err << "\n"; + assert(send_err != TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_TOO_LONG); + } else { + std::cout << "sent requested command to " << peer_id << "\n"; + } + } + // else, we dont care. maybe check staging too + } + + // HACK: clear each tick + ctx->requested_commands.clear(); + std::cout << "cleared requested commands\n"; + } + } { // gossip frontier // for mutex locking simplicity this is an either-or @@ -628,7 +702,36 @@ void toxThread(SharedContext* ctx) { } } } else if (!ctx->should_gossip_remote.empty()) { - std::lock_guard lg{ctx->command_lists_mutex}; + // we got new remote staged, lets amp the traffic + + // only do first + auto it = ctx->should_gossip_remote.cbegin(); + + pkg::Frontier f_pkg{ + *it, + 0u + }; + + { // lock + std::lock_guard lg{ctx->staging_mutex}; + assert(ctx->staging_frontier.count(*it)); + + f_pkg.seq = ctx->staging_frontier.at(*it); + } + + std::vector data = nlohmann::json::to_msgpack(f_pkg); + // prepend pkgid + data.emplace(data.begin(), static_cast(pkg::PKGID::FRONTIER)); + + Tox_Err_Group_Send_Custom_Packet send_err{TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK}; + if (!tox_group_send_custom_packet(ctx->tox, 0, true, data.data(), data.size(), &send_err)) { + std::cerr << "failed to send gossip packet " << send_err << "\n"; + assert(send_err != TOX_ERR_GROUP_SEND_CUSTOM_PACKET_TOO_LONG); + } else { + std::cout << "sent gossip of remote agent\n"; + } + + ctx->should_gossip_remote.erase(it); } } } @@ -873,9 +976,12 @@ int main(void) { seq = ctx.command_frontier[agent] + 1; } for (; seq <= staging_seq; seq++) { + assert(ctx.command_lists.count(agent)); + assert(ctx.command_lists.at(agent).count(seq)); // !! this can get expensive, while we are holding locks :/ bool r = doc.apply(ctx.command_lists.at(agent).at(seq).ops); + // TODO: actually this can fail with missing parents of an agent we never heard about before if (!r) { std::cout << "faild to apply:\n"; for (const auto& op : ctx.command_lists.at(agent).at(seq).ops) { @@ -969,7 +1075,10 @@ int main(void) { if (ctx.command_frontier.count(ctx.agent)) { // get last own seq seq = ctx.command_frontier[ctx.agent] + 1; } - const size_t max_ops {5}; // limit ops per command so we can fit them into packets + + // 5 can be too much + // 3 seems save, but is slow + const size_t max_ops {4}; // limit ops per command so we can fit them into packets size_t check_op_count {0}; for (size_t i = 0; i < ops.size(); seq++) { // TODO: check @@ -1037,6 +1146,9 @@ static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length, u return; } + // TODO: keep track of time/connected disconnected + ctx.tox_seen_peers.emplace(peer_id); + std::cout << "pkg " << pkg_id << " j:" << p_j.dump() << "\n"; switch (pkg_id) { @@ -1059,17 +1171,23 @@ static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length, u // push to buffer, if not in buffer if (!ctx.buffer[pkg.agent].count(pkg.seq)) { + { // also check staging frontier, if its not a dup + std::lock_guard lg {ctx.staging_mutex}; + if (ctx.staging_frontier.count(pkg.agent) && pkg.seq <= ctx.staging_frontier.at(pkg.agent)) { + break; // allready in staging or master + } + } ctx.buffer[pkg.agent].emplace(pkg.seq, pkg); std::cout << "pushed to buffer " << pkg.seq << " from " << pkg.agent << "\n"; } // TODO: notify something? break; } - case pkg::PKGID::REQUEST_COMMANDS: { - pkg::RequestCommands pkg = p_j; + case pkg::PKGID::REQUEST_COMMAND: { + pkg::RequestCommand pkg = p_j; // TODO: this can lead to double requests // TODO: maybe settle for single seq requests for now?, since they are indivitual packets anyway - ctx.requested_commands.push_back(std::make_pair(peer_id, pkg)); + ctx.requested_commands[peer_id] = pkg; break; } default: @@ -1081,12 +1199,14 @@ static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length, u static void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) { std::cout << "group_custom_packet_cb\n"; SharedContext& ctx = *static_cast(user_data); + assert(ctx.tox_group_number == group_number); handle_pkg(ctx, data, length, peer_id); } static void group_custom_private_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) { std::cout << "group_custom_private_packet_cb\n"; SharedContext& ctx = *static_cast(user_data); + assert(ctx.tox_group_number == group_number); handle_pkg(ctx, data, length, peer_id); }