gossip p1 works, command 2 buffer works if not too large

This commit is contained in:
Green Sky 2022-12-19 21:33:14 +01:00
parent 7177d90c44
commit e961b8aec3
No known key found for this signature in database
1 changed files with 166 additions and 6 deletions

View File

@ -1,3 +1,4 @@
#include "toxcore/tox.h"
#include <crdt/text_document.hpp> #include <crdt/text_document.hpp>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
@ -61,7 +62,7 @@ using Doc = GreenCRDT::TextDocument<Agent>;
using ListType = Doc::ListType; using ListType = Doc::ListType;
struct Command { struct Command {
Agent actor; Agent agent;
uint64_t seq {0}; // independed of the ops inside, theoretically uint64_t seq {0}; // independed of the ops inside, theoretically
//... //...
std::vector<Doc::Op> ops; std::vector<Doc::Op> ops;
@ -129,7 +130,7 @@ namespace std {
} // namespace std } // namespace std
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Command, NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Command,
actor, agent,
seq, seq,
ops ops
) )
@ -296,8 +297,9 @@ struct SharedContext {
// could merge into comamnd_lists // could merge into comamnd_lists
std::unordered_map<ToxPubKey, std::unordered_map<uint64_t, Command>> buffer; std::unordered_map<ToxPubKey, std::unordered_map<uint64_t, Command>> buffer;
std::atomic_bool should_gossip_local; // local changes (set by main thread, reset by tox thread) std::atomic_bool should_gossip_local{false}; // local changes (set by main thread, reset by tox thread)
std::unordered_set<ToxPubKey> should_gossip_remote; // list of ids we have new seq for (only modified by tox thread) std::unordered_set<ToxPubKey> should_gossip_remote; // list of ids we have new seq for (only modified by tox thread)
std::unordered_map<ToxPubKey, uint64_t> heard_gossip; // seq frontiers we have heard about
Tox* tox {nullptr}; Tox* tox {nullptr};
bool tox_dht_online {false}; bool tox_dht_online {false};
@ -307,6 +309,51 @@ struct SharedContext {
namespace tox { namespace tox {
namespace pkg {
enum PKGID : uint8_t {
FRONTIER = 32,
REQUEST_FRONTIER,
COMMAND,
REQUEST_COMMANDS,
};
// send the currently last known seq you have (excluding buffer)
struct Frontier {
Agent agent;
uint64_t seq{0};
};
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Frontier,
agent,
seq
)
// request the last known seq another peer has for agent
struct RequestFrontier {
Agent agent;
};
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestFrontier,
agent
)
using Command = ::Command;
// request every command for agent after seq (inclusive)
struct RequestCommands {
Agent agent;
uint64_t seq{0};
};
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommands,
agent,
seq
)
} // namespace pkg
static std::vector<uint8_t> hex2bin(const std::string& str) { static std::vector<uint8_t> hex2bin(const std::string& str) {
std::vector<uint8_t> bin{}; std::vector<uint8_t> bin{};
bin.resize(str.size()/2, 0); bin.resize(str.size()/2, 0);
@ -390,6 +437,9 @@ void toxThread(SharedContext* ctx) {
} }
} }
// "thread local"
Agent agent_local;
//tox_group_self_get_public_key() //tox_group_self_get_public_key()
//tox_group_send_custom_packet() //tox_group_send_custom_packet()
//tox_group_send_custom_private_packet() //tox_group_send_custom_private_packet()
@ -416,6 +466,7 @@ void toxThread(SharedContext* ctx) {
ctx->agent_set.set_value(); ctx->agent_set.set_value();
return; // fuck everything return; // fuck everything
} }
agent_local = ctx->agent;
ctx->agent_set.set_value(); ctx->agent_set.set_value();
} }
} else if (!ctx->tox_group_online) { // then wait for group to connect } else if (!ctx->tox_group_online) { // then wait for group to connect
@ -424,9 +475,66 @@ void toxThread(SharedContext* ctx) {
std::cout << "tox connected to group\n"; std::cout << "tox connected to group\n";
} }
} else { // do the thing } else { // do the thing
// staging? // pump from buffer to staging
// request missing in buffer
// request frontier (implicit list of agents)
// only every couple of second, can get large
// OR get back random agent and do it often
// handle requests // handle requests
// send tip (prio self)
{ // gossip frontier
// for mutex locking simplicity this is an either-or
if (ctx->should_gossip_local.exchange(false)) {
pkg::Frontier f_pkg{
agent_local,
0u
};
pkg::Command c_pkg{
agent_local,
0u,
{}
};
{ // lock
std::lock_guard lg{ctx->command_lists_mutex};
assert(ctx->command_frontier.count(agent_local));
f_pkg.seq = ctx->command_frontier.at(agent_local);
c_pkg = ctx->command_lists[agent_local][f_pkg.seq];
}
{ // gossip
std::vector<uint8_t> data = nlohmann::json::to_msgpack(f_pkg);
// prepend pkgid
data.emplace(data.begin(), static_cast<uint8_t>(pkg::PKGID::FRONTIER));
if (!tox_group_send_custom_packet(ctx->tox, 0, true, data.data(), data.size(), nullptr)) {
std::cerr << "failed to send gossip packet of local agent\n";
// TODO: set should_gossip_local back to true?
} else {
std::cout << "sent gossip of local agent\n";
}
}
{ // command
std::vector<uint8_t> data = nlohmann::json::to_msgpack(c_pkg);
// prepend pkgid
data.emplace(data.begin(), static_cast<uint8_t>(pkg::PKGID::COMMAND));
if (!tox_group_send_custom_packet(ctx->tox, 0, true, data.data(), data.size(), nullptr)) {
std::cerr << "failed to send command packet of local agent\n";
} else {
std::cout << "sent command of local agent\n";
}
}
} else if (!ctx->should_gossip_remote.empty()) {
std::lock_guard lg{ctx->command_lists_mutex};
}
}
} }
std::this_thread::sleep_for(20ms); std::this_thread::sleep_for(20ms);
@ -751,7 +859,7 @@ int main(void) {
auto& local_command_list = ctx.command_lists[ctx.agent]; auto& local_command_list = ctx.command_lists[ctx.agent];
uint64_t seq {0}; uint64_t seq {0};
if (ctx.command_frontier.count(ctx.agent)) { // get last own seq if (ctx.command_frontier.count(ctx.agent)) { // get last own seq
seq = ctx.command_frontier[ctx.agent]++; seq = ctx.command_frontier[ctx.agent] + 1;
} }
local_command_list.emplace(seq, Command{ local_command_list.emplace(seq, Command{
ctx.agent, ctx.agent,
@ -789,14 +897,66 @@ static void self_connection_status_cb(Tox*, TOX_CONNECTION connection_status, vo
std::cout << "self_connection_status_cb " << connection_status << "\n"; std::cout << "self_connection_status_cb " << connection_status << "\n";
} }
static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length) {
if (length < 2) {
std::cerr << "got too short pkg " << length << "\n";
return;
}
pkg::PKGID pkg_id = static_cast<pkg::PKGID>(data[0]);
const auto p_j = nlohmann::json::from_msgpack(data+1, data+1 + (length-1), true, false);
if (p_j.is_discarded()) {
std::cerr << "got invalid msgpack for " << pkg_id << "\n";
return;
}
std::cout << "pkg " << pkg_id << " j:" << p_j.dump() << "\n";
switch (pkg_id) {
case pkg::PKGID::FRONTIER: {
pkg::Frontier pkg = p_j;
if (!ctx.heard_gossip.count(pkg.agent) || ctx.heard_gossip[pkg.agent] < pkg.seq) {
ctx.heard_gossip[pkg.agent] = pkg.seq;
std::cout << "new seq " << pkg.seq << " from " << pkg.agent << "\n";
}
break;
}
case pkg::PKGID::REQUEST_FRONTIER: {
pkg::RequestFrontier pkg = p_j;
break;
}
case pkg::PKGID::COMMAND: {
pkg::Command pkg = p_j;
// push to buffer, if not in buffer
if (!ctx.buffer[pkg.agent].count(pkg.seq)) {
ctx.buffer[pkg.agent].emplace(pkg.seq, pkg);
std::cout << "pushed to buffer " << pkg.seq << " from " << pkg.agent << "\n";
}
// TODO: notify something?
break;
}
case pkg::PKGID::REQUEST_COMMANDS: {
pkg::RequestCommands pkg = p_j;
break;
}
default:
std::cerr << "unknown pkg id " << pkg_id << "\n";
break;
}
}
static void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) { static void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) {
std::cout << "group_custom_packet_cb\n"; std::cout << "group_custom_packet_cb\n";
SharedContext& ctx = *static_cast<SharedContext*>(user_data); SharedContext& ctx = *static_cast<SharedContext*>(user_data);
handle_pkg(ctx, data, length);
} }
static void group_custom_private_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) { static void group_custom_private_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) {
std::cout << "group_custom_private_packet_cb\n"; std::cout << "group_custom_private_packet_cb\n";
SharedContext& ctx = *static_cast<SharedContext*>(user_data); SharedContext& ctx = *static_cast<SharedContext*>(user_data);
handle_pkg(ctx, data, length);
} }
} }