send requests, handle requests, more gossip

requests loop sometimes o.o
This commit is contained in:
Green Sky 2022-12-21 00:53:49 +01:00
parent d845baf804
commit 4aa2a152af
No known key found for this signature in database

View File

@ -18,6 +18,8 @@ extern "C" {
#include <mutex>
#include <atomic>
#include <chrono>
#include <random>
#include <utility>
#include <iostream>
#include <cassert>
@ -286,15 +288,13 @@ echo 'setup done'
} // namespace vim
// visibility hack
struct RequestCommands {
struct RequestCommand {
Agent agent;
uint64_t after_seq{0};
uint64_t until_seq{0};
uint64_t seq{0};
};
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommands,
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommand,
agent,
after_seq,
until_seq
seq
)
// hash for unordered_set
@ -321,6 +321,10 @@ struct SharedContext {
std::unordered_map<ToxPubKey, uint64_t> staging_frontier; // last seq we have in command_lists, via tox
// (can be lower then command_frontier for local agent
// TODO
std::mutex unknown_agents_mutex;
std::unordered_set<ToxPubKey> unknown_agents; // list of agents we read about but dont have in command/saging frontier
// contains remote changes with missing parent seq
// could merge into comamnd_lists
std::unordered_map<ToxPubKey, std::unordered_map<uint64_t, Command>> buffer;
@ -332,13 +336,14 @@ struct SharedContext {
// peer ids that requested the last known seq for agent
std::unordered_set<std::pair<uint32_t, Agent>> requested_frontier;
// peer ids that requested a command (range)
std::vector<std::pair<uint32_t, RequestCommands>> requested_commands;
// peer ids that requested a command
std::unordered_map<uint32_t, RequestCommand> requested_commands;
Tox* tox {nullptr};
bool tox_dht_online {false};
bool tox_group_online {false};
uint32_t tox_group_number {-1u};
std::unordered_set<uint32_t> tox_seen_peers;
};
namespace tox {
@ -348,9 +353,10 @@ namespace pkg {
enum PKGID : uint8_t {
FRONTIER = 32,
REQUEST_FRONTIER,
REQUEST_FRONTIERS,
COMMAND,
REQUEST_COMMANDS,
REQUEST_COMMAND,
};
// send the currently last known seq you have (excluding buffer)
@ -381,12 +387,11 @@ namespace pkg {
//uint64_t after_seq{0};
//uint64_t until_seq{0};
//};
using RequestCommands = ::RequestCommands;
using RequestCommand = ::RequestCommand;
//NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommands,
//NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommand,
//agent,
//after_seq,
//until_seq
//seq
//)
} // namespace pkg
@ -420,6 +425,8 @@ static void group_custom_private_packet_cb(Tox* tox, uint32_t group_number, uint
void toxThread(SharedContext* ctx) {
using namespace std::chrono_literals;
std::minstd_rand rng{1337};
TOX_ERR_OPTIONS_NEW err_opt_new;
Tox_Options* options = tox_options_new(&err_opt_new);
assert(err_opt_new == TOX_ERR_OPTIONS_NEW::TOX_ERR_OPTIONS_NEW_OK);
@ -477,10 +484,6 @@ void toxThread(SharedContext* ctx) {
// "thread local"
Agent agent_local;
//tox_group_self_get_public_key()
//tox_group_send_custom_packet()
//tox_group_send_custom_private_packet()
while (!ctx->should_quit) {
// tox iterate
tox_iterate(ctx->tox, ctx);
@ -512,13 +515,18 @@ void toxThread(SharedContext* ctx) {
std::cout << "tox connected to group\n";
}
} else { // do the thing
std::vector<std::pair<Agent, uint64_t>> missing_in_buffer;
{ // pump from buffer to staging
const size_t max_commands = 1;
const size_t max_commands = 2;
size_t number_of_commands_done = 0;
std::vector<Agent> empty_buffers;
{
std::lock_guard lg_staging{ctx->staging_mutex};
for (auto& [agent, buffer] : ctx->buffer) {
if (buffer.empty()) {
empty_buffers.push_back(agent);
continue;
}
if (agent == agent_local) {
// skip ? self
continue;
@ -534,7 +542,15 @@ void toxThread(SharedContext* ctx) {
}
if (!buffer.count(seq)) { // not in buffer, skip
std::cout << "!!! buffer not empty but not next seq\n";
// check if old in buffer
for (const auto& it : buffer) {
if (it.first < seq) {
assert(false && "buffer not clean !!");
}
}
//std::cout << "!!! buffer not empty but not next seq\n";
missing_in_buffer.push_back(std::make_pair(agent, seq));
continue;
}
@ -543,6 +559,7 @@ void toxThread(SharedContext* ctx) {
std::lock_guard lg{ctx->command_lists_mutex};
for (; buffer.count(seq); seq++) {
ctx->command_lists[agent][seq] = buffer.at(seq);
ctx->staging_frontier[agent] = seq;
seq_to_remove.push_back(seq);
number_of_commands_done++;
@ -551,7 +568,7 @@ void toxThread(SharedContext* ctx) {
}
}
ctx->staging_frontier[agent] = seq;
//ctx->staging_frontier[agent] = seq;
}
ctx->should_gossip_remote.emplace(agent);
@ -568,13 +585,70 @@ void toxThread(SharedContext* ctx) {
}
}
// request missing in buffer
{ // request missing in buffer
// (every tick lol)
for (const auto& [agent, seq] : missing_in_buffer) {
// ask random peer_id we have seen before
const uint32_t peer_id = *ctx->tox_seen_peers.cbegin() + (rng() % ctx->tox_seen_peers.size());
const auto status = tox_group_peer_get_connection_status(ctx->tox, ctx->tox_group_number, peer_id, nullptr);
if (status == TOX_CONNECTION_NONE) {
// bad luck, skip
// TODO: do seen peers cleanup
continue;
}
// send request for command
pkg::RequestCommand rc_pkg{
agent, seq
};
std::vector<uint8_t> data = nlohmann::json::to_msgpack(rc_pkg);
// prepend pkgid
data.emplace(data.begin(), static_cast<uint8_t>(pkg::PKGID::REQUEST_COMMAND));
Tox_Err_Group_Send_Custom_Private_Packet send_err{TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK};
if (!tox_group_send_custom_private_packet(ctx->tox, ctx->tox_group_number, peer_id, true, data.data(), data.size(), &send_err)) {
std::cerr << "failed to send command request packet for " << std::dec << seq << " from " << std::hex << (int)agent[0] << " " << std::dec << (int)send_err << "\n";
assert(send_err != TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_TOO_LONG);
} else {
std::cout << "sent command request packet for " << std::dec << seq << " from " << std::hex << (int)agent[0] << " to " << std::dec << peer_id << "\n";
}
}
}
// request frontier (implicit list of agents)
// only every couple of second, can get large
// OR get back random agent and do it often
// handle requests
{ // handle requests
// TODO: this lock is trash
if (!ctx->requested_commands.empty()) {
std::lock_guard lg{ctx->command_lists_mutex};
for (const auto& [peer_id, request] : ctx->requested_commands) {
if (ctx->command_lists.count(request.agent) && ctx->command_lists.at(request.agent).count(request.seq)) {
const auto& command = ctx->command_lists.at(request.agent).at(request.seq);
// send command
std::vector<uint8_t> data = nlohmann::json::to_msgpack(command);
// prepend pkgid
data.emplace(data.begin(), static_cast<uint8_t>(pkg::PKGID::COMMAND));
Tox_Err_Group_Send_Custom_Private_Packet send_err{TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK};
if (!tox_group_send_custom_private_packet(ctx->tox, ctx->tox_group_number, peer_id, true, data.data(), data.size(), &send_err)) {
std::cerr << "failed to send command packet " << send_err << "\n";
assert(send_err != TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_TOO_LONG);
} else {
std::cout << "sent requested command to " << peer_id << "\n";
}
}
// else, we dont care. maybe check staging too
}
// HACK: clear each tick
ctx->requested_commands.clear();
std::cout << "cleared requested commands\n";
}
}
{ // gossip frontier
// for mutex locking simplicity this is an either-or
@ -628,7 +702,36 @@ void toxThread(SharedContext* ctx) {
}
}
} else if (!ctx->should_gossip_remote.empty()) {
std::lock_guard lg{ctx->command_lists_mutex};
// we got new remote staged, lets amp the traffic
// only do first
auto it = ctx->should_gossip_remote.cbegin();
pkg::Frontier f_pkg{
*it,
0u
};
{ // lock
std::lock_guard lg{ctx->staging_mutex};
assert(ctx->staging_frontier.count(*it));
f_pkg.seq = ctx->staging_frontier.at(*it);
}
std::vector<uint8_t> data = nlohmann::json::to_msgpack(f_pkg);
// prepend pkgid
data.emplace(data.begin(), static_cast<uint8_t>(pkg::PKGID::FRONTIER));
Tox_Err_Group_Send_Custom_Packet send_err{TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK};
if (!tox_group_send_custom_packet(ctx->tox, 0, true, data.data(), data.size(), &send_err)) {
std::cerr << "failed to send gossip packet " << send_err << "\n";
assert(send_err != TOX_ERR_GROUP_SEND_CUSTOM_PACKET_TOO_LONG);
} else {
std::cout << "sent gossip of remote agent\n";
}
ctx->should_gossip_remote.erase(it);
}
}
}
@ -873,9 +976,12 @@ int main(void) {
seq = ctx.command_frontier[agent] + 1;
}
for (; seq <= staging_seq; seq++) {
assert(ctx.command_lists.count(agent));
assert(ctx.command_lists.at(agent).count(seq));
// !! this can get expensive, while we are holding locks :/
bool r = doc.apply(ctx.command_lists.at(agent).at(seq).ops);
// TODO: actually this can fail with missing parents of an agent we never heard about before
if (!r) {
std::cout << "faild to apply:\n";
for (const auto& op : ctx.command_lists.at(agent).at(seq).ops) {
@ -969,7 +1075,10 @@ int main(void) {
if (ctx.command_frontier.count(ctx.agent)) { // get last own seq
seq = ctx.command_frontier[ctx.agent] + 1;
}
const size_t max_ops {5}; // limit ops per command so we can fit them into packets
// 5 can be too much
// 3 seems save, but is slow
const size_t max_ops {4}; // limit ops per command so we can fit them into packets
size_t check_op_count {0};
for (size_t i = 0; i < ops.size(); seq++) {
// TODO: check
@ -1037,6 +1146,9 @@ static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length, u
return;
}
// TODO: keep track of time/connected disconnected
ctx.tox_seen_peers.emplace(peer_id);
std::cout << "pkg " << pkg_id << " j:" << p_j.dump() << "\n";
switch (pkg_id) {
@ -1059,17 +1171,23 @@ static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length, u
// push to buffer, if not in buffer
if (!ctx.buffer[pkg.agent].count(pkg.seq)) {
{ // also check staging frontier, if its not a dup
std::lock_guard lg {ctx.staging_mutex};
if (ctx.staging_frontier.count(pkg.agent) && pkg.seq <= ctx.staging_frontier.at(pkg.agent)) {
break; // allready in staging or master
}
}
ctx.buffer[pkg.agent].emplace(pkg.seq, pkg);
std::cout << "pushed to buffer " << pkg.seq << " from " << pkg.agent << "\n";
}
// TODO: notify something?
break;
}
case pkg::PKGID::REQUEST_COMMANDS: {
pkg::RequestCommands pkg = p_j;
case pkg::PKGID::REQUEST_COMMAND: {
pkg::RequestCommand pkg = p_j;
// TODO: this can lead to double requests
// TODO: maybe settle for single seq requests for now?, since they are indivitual packets anyway
ctx.requested_commands.push_back(std::make_pair(peer_id, pkg));
ctx.requested_commands[peer_id] = pkg;
break;
}
default:
@ -1081,12 +1199,14 @@ static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length, u
static void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) {
std::cout << "group_custom_packet_cb\n";
SharedContext& ctx = *static_cast<SharedContext*>(user_data);
assert(ctx.tox_group_number == group_number);
handle_pkg(ctx, data, length, peer_id);
}
static void group_custom_private_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) {
std::cout << "group_custom_private_packet_cb\n";
SharedContext& ctx = *static_cast<SharedContext*>(user_data);
assert(ctx.tox_group_number == group_number);
handle_pkg(ctx, data, length, peer_id);
}