mirror of
https://github.com/Green-Sky/crdt_tests.git
synced 2024-12-22 23:53:24 +01:00
Compare commits
No commits in common. "e961b8aec318c2389a9122e4c6f61d9f36cc2345" and "2c10e258c09f7b54f7c31290381fd90037911bf0" have entirely different histories.
e961b8aec3
...
2c10e258c0
@ -1,4 +1,3 @@
|
|||||||
#include "toxcore/tox.h"
|
|
||||||
#include <crdt/text_document.hpp>
|
#include <crdt/text_document.hpp>
|
||||||
#include <nlohmann/json.hpp>
|
#include <nlohmann/json.hpp>
|
||||||
|
|
||||||
@ -11,7 +10,6 @@ extern "C" {
|
|||||||
#include <optional>
|
#include <optional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <unordered_set>
|
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
#include <variant>
|
#include <variant>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
@ -62,7 +60,7 @@ using Doc = GreenCRDT::TextDocument<Agent>;
|
|||||||
using ListType = Doc::ListType;
|
using ListType = Doc::ListType;
|
||||||
|
|
||||||
struct Command {
|
struct Command {
|
||||||
Agent agent;
|
Agent actor;
|
||||||
uint64_t seq {0}; // independed of the ops inside, theoretically
|
uint64_t seq {0}; // independed of the ops inside, theoretically
|
||||||
//...
|
//...
|
||||||
std::vector<Doc::Op> ops;
|
std::vector<Doc::Op> ops;
|
||||||
@ -130,7 +128,7 @@ namespace std {
|
|||||||
} // namespace std
|
} // namespace std
|
||||||
|
|
||||||
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Command,
|
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Command,
|
||||||
agent,
|
actor,
|
||||||
seq,
|
seq,
|
||||||
ops
|
ops
|
||||||
)
|
)
|
||||||
@ -284,22 +282,14 @@ struct SharedContext {
|
|||||||
ToxPubKey agent;
|
ToxPubKey agent;
|
||||||
std::promise<void> agent_set;
|
std::promise<void> agent_set;
|
||||||
|
|
||||||
std::mutex command_lists_mutex; // for list and frontier!!
|
// TODO: this is inefficent
|
||||||
std::unordered_map<ToxPubKey, std::unordered_map<uint64_t, Command>> command_lists;
|
std::mutex command_lists_mutex;
|
||||||
std::unordered_map<ToxPubKey, uint64_t> command_frontier; // last applied seq
|
std::unordered_map<ToxPubKey, std::vector<Command>> command_lists;
|
||||||
|
// remote op queue for receive
|
||||||
|
// local op list for remote lookups
|
||||||
|
|
||||||
// contains remote changes we can apply in the main thread
|
// last seq for all known agents
|
||||||
std::mutex staging_mutex;
|
// bool dirty
|
||||||
std::unordered_map<ToxPubKey, uint64_t> staging_frontier; // last seq we have in command_lists, via tox
|
|
||||||
// (can be lower then command_frontier for local agent
|
|
||||||
|
|
||||||
// contains remote changes with missing parent seq
|
|
||||||
// could merge into comamnd_lists
|
|
||||||
std::unordered_map<ToxPubKey, std::unordered_map<uint64_t, Command>> buffer;
|
|
||||||
|
|
||||||
std::atomic_bool should_gossip_local{false}; // local changes (set by main thread, reset by tox thread)
|
|
||||||
std::unordered_set<ToxPubKey> should_gossip_remote; // list of ids we have new seq for (only modified by tox thread)
|
|
||||||
std::unordered_map<ToxPubKey, uint64_t> heard_gossip; // seq frontiers we have heard about
|
|
||||||
|
|
||||||
Tox* tox {nullptr};
|
Tox* tox {nullptr};
|
||||||
bool tox_dht_online {false};
|
bool tox_dht_online {false};
|
||||||
@ -309,51 +299,6 @@ struct SharedContext {
|
|||||||
|
|
||||||
namespace tox {
|
namespace tox {
|
||||||
|
|
||||||
namespace pkg {
|
|
||||||
|
|
||||||
enum PKGID : uint8_t {
|
|
||||||
FRONTIER = 32,
|
|
||||||
REQUEST_FRONTIER,
|
|
||||||
|
|
||||||
COMMAND,
|
|
||||||
REQUEST_COMMANDS,
|
|
||||||
};
|
|
||||||
|
|
||||||
// send the currently last known seq you have (excluding buffer)
|
|
||||||
struct Frontier {
|
|
||||||
Agent agent;
|
|
||||||
uint64_t seq{0};
|
|
||||||
};
|
|
||||||
|
|
||||||
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(Frontier,
|
|
||||||
agent,
|
|
||||||
seq
|
|
||||||
)
|
|
||||||
|
|
||||||
// request the last known seq another peer has for agent
|
|
||||||
struct RequestFrontier {
|
|
||||||
Agent agent;
|
|
||||||
};
|
|
||||||
|
|
||||||
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestFrontier,
|
|
||||||
agent
|
|
||||||
)
|
|
||||||
|
|
||||||
using Command = ::Command;
|
|
||||||
|
|
||||||
// request every command for agent after seq (inclusive)
|
|
||||||
struct RequestCommands {
|
|
||||||
Agent agent;
|
|
||||||
uint64_t seq{0};
|
|
||||||
};
|
|
||||||
|
|
||||||
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(RequestCommands,
|
|
||||||
agent,
|
|
||||||
seq
|
|
||||||
)
|
|
||||||
|
|
||||||
} // namespace pkg
|
|
||||||
|
|
||||||
static std::vector<uint8_t> hex2bin(const std::string& str) {
|
static std::vector<uint8_t> hex2bin(const std::string& str) {
|
||||||
std::vector<uint8_t> bin{};
|
std::vector<uint8_t> bin{};
|
||||||
bin.resize(str.size()/2, 0);
|
bin.resize(str.size()/2, 0);
|
||||||
@ -437,9 +382,6 @@ void toxThread(SharedContext* ctx) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// "thread local"
|
|
||||||
Agent agent_local;
|
|
||||||
|
|
||||||
//tox_group_self_get_public_key()
|
//tox_group_self_get_public_key()
|
||||||
//tox_group_send_custom_packet()
|
//tox_group_send_custom_packet()
|
||||||
//tox_group_send_custom_private_packet()
|
//tox_group_send_custom_private_packet()
|
||||||
@ -466,7 +408,6 @@ void toxThread(SharedContext* ctx) {
|
|||||||
ctx->agent_set.set_value();
|
ctx->agent_set.set_value();
|
||||||
return; // fuck everything
|
return; // fuck everything
|
||||||
}
|
}
|
||||||
agent_local = ctx->agent;
|
|
||||||
ctx->agent_set.set_value();
|
ctx->agent_set.set_value();
|
||||||
}
|
}
|
||||||
} else if (!ctx->tox_group_online) { // then wait for group to connect
|
} else if (!ctx->tox_group_online) { // then wait for group to connect
|
||||||
@ -475,66 +416,9 @@ void toxThread(SharedContext* ctx) {
|
|||||||
std::cout << "tox connected to group\n";
|
std::cout << "tox connected to group\n";
|
||||||
}
|
}
|
||||||
} else { // do the thing
|
} else { // do the thing
|
||||||
// pump from buffer to staging
|
// staging?
|
||||||
|
|
||||||
// request missing in buffer
|
|
||||||
|
|
||||||
// request frontier (implicit list of agents)
|
|
||||||
// only every couple of second, can get large
|
|
||||||
// OR get back random agent and do it often
|
|
||||||
|
|
||||||
// handle requests
|
// handle requests
|
||||||
|
// send tip (prio self)
|
||||||
{ // gossip frontier
|
|
||||||
// for mutex locking simplicity this is an either-or
|
|
||||||
if (ctx->should_gossip_local.exchange(false)) {
|
|
||||||
pkg::Frontier f_pkg{
|
|
||||||
agent_local,
|
|
||||||
0u
|
|
||||||
};
|
|
||||||
|
|
||||||
pkg::Command c_pkg{
|
|
||||||
agent_local,
|
|
||||||
0u,
|
|
||||||
{}
|
|
||||||
};
|
|
||||||
|
|
||||||
{ // lock
|
|
||||||
std::lock_guard lg{ctx->command_lists_mutex};
|
|
||||||
assert(ctx->command_frontier.count(agent_local));
|
|
||||||
|
|
||||||
f_pkg.seq = ctx->command_frontier.at(agent_local);
|
|
||||||
|
|
||||||
c_pkg = ctx->command_lists[agent_local][f_pkg.seq];
|
|
||||||
}
|
|
||||||
|
|
||||||
{ // gossip
|
|
||||||
std::vector<uint8_t> data = nlohmann::json::to_msgpack(f_pkg);
|
|
||||||
// prepend pkgid
|
|
||||||
data.emplace(data.begin(), static_cast<uint8_t>(pkg::PKGID::FRONTIER));
|
|
||||||
|
|
||||||
if (!tox_group_send_custom_packet(ctx->tox, 0, true, data.data(), data.size(), nullptr)) {
|
|
||||||
std::cerr << "failed to send gossip packet of local agent\n";
|
|
||||||
// TODO: set should_gossip_local back to true?
|
|
||||||
} else {
|
|
||||||
std::cout << "sent gossip of local agent\n";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{ // command
|
|
||||||
std::vector<uint8_t> data = nlohmann::json::to_msgpack(c_pkg);
|
|
||||||
// prepend pkgid
|
|
||||||
data.emplace(data.begin(), static_cast<uint8_t>(pkg::PKGID::COMMAND));
|
|
||||||
|
|
||||||
if (!tox_group_send_custom_packet(ctx->tox, 0, true, data.data(), data.size(), nullptr)) {
|
|
||||||
std::cerr << "failed to send command packet of local agent\n";
|
|
||||||
} else {
|
|
||||||
std::cout << "sent command of local agent\n";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (!ctx->should_gossip_remote.empty()) {
|
|
||||||
std::lock_guard lg{ctx->command_lists_mutex};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::this_thread::sleep_for(20ms);
|
std::this_thread::sleep_for(20ms);
|
||||||
@ -619,43 +503,28 @@ int main(void) {
|
|||||||
std::cout << "waiting for agent id\n";
|
std::cout << "waiting for agent id\n";
|
||||||
ctx.agent_set.get_future().wait();
|
ctx.agent_set.get_future().wait();
|
||||||
if (ctx.should_quit) {
|
if (ctx.should_quit) {
|
||||||
tox_thread.join(); // wait for thread
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "starting vim ipc server\n";
|
std::cout << "starting vim ipc server\n";
|
||||||
|
|
||||||
if (zed_net_init() != 0) {
|
if (zed_net_init() != 0) {
|
||||||
ctx.should_quit.store(true);
|
|
||||||
std::cerr << "zed_net_init failed: " << zed_net_get_error() << "\n";
|
std::cerr << "zed_net_init failed: " << zed_net_get_error() << "\n";
|
||||||
tox_thread.join(); // wait for thread
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "initialized zed_net\n";
|
std::cout << "initialized zed_net\n";
|
||||||
|
|
||||||
const uint16_t port_start {1337};
|
const uint16_t port {1337};
|
||||||
const uint16_t port_end {1437};
|
|
||||||
uint16_t port = port_start;
|
|
||||||
zed_net_socket_t listen_socket;
|
zed_net_socket_t listen_socket;
|
||||||
bool found_free_port {false};
|
if (zed_net_tcp_socket_open(
|
||||||
for (; port <= port_end; port++) {
|
&listen_socket,
|
||||||
if (zed_net_tcp_socket_open(
|
port, // port
|
||||||
&listen_socket,
|
0, // non blocking
|
||||||
port, // port
|
1 // listen
|
||||||
0, // non blocking
|
) != 0) {
|
||||||
1 // listen
|
|
||||||
) == 0) {
|
|
||||||
found_free_port = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!found_free_port) {
|
|
||||||
ctx.should_quit.store(true);
|
|
||||||
std::cerr << "zed_net_tcp_socket_open failed: " << zed_net_get_error() << "\n";
|
std::cerr << "zed_net_tcp_socket_open failed: " << zed_net_get_error() << "\n";
|
||||||
zed_net_shutdown();
|
zed_net_shutdown();
|
||||||
tox_thread.join(); // wait for thread
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -669,11 +538,9 @@ int main(void) {
|
|||||||
zed_net_socket_t remote_socket;
|
zed_net_socket_t remote_socket;
|
||||||
zed_net_address_t remote_address;
|
zed_net_address_t remote_address;
|
||||||
if (zed_net_tcp_accept(&listen_socket, &remote_socket, &remote_address) != 0) {
|
if (zed_net_tcp_accept(&listen_socket, &remote_socket, &remote_address) != 0) {
|
||||||
ctx.should_quit.store(true);
|
|
||||||
std::cerr << "zed_net_tcp_accept failed: " << zed_net_get_error() << "\n";
|
std::cerr << "zed_net_tcp_accept failed: " << zed_net_get_error() << "\n";
|
||||||
zed_net_socket_close(&listen_socket);
|
zed_net_socket_close(&listen_socket);
|
||||||
zed_net_shutdown();
|
zed_net_shutdown();
|
||||||
tox_thread.join(); // wait for thread
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -693,12 +560,10 @@ int main(void) {
|
|||||||
int64_t bytes_received {0};
|
int64_t bytes_received {0};
|
||||||
bytes_received = zed_net_tcp_socket_receive(&remote_socket, buffer->data(), buffer->size());
|
bytes_received = zed_net_tcp_socket_receive(&remote_socket, buffer->data(), buffer->size());
|
||||||
if (bytes_received < 0) {
|
if (bytes_received < 0) {
|
||||||
ctx.should_quit.store(true);
|
|
||||||
std::cerr << "zed_net_tcp_socket_receive failed: " << zed_net_get_error() << "\n";
|
std::cerr << "zed_net_tcp_socket_receive failed: " << zed_net_get_error() << "\n";
|
||||||
zed_net_socket_close(&remote_socket);
|
zed_net_socket_close(&remote_socket);
|
||||||
zed_net_socket_close(&listen_socket);
|
zed_net_socket_close(&listen_socket);
|
||||||
zed_net_shutdown();
|
zed_net_shutdown();
|
||||||
tox_thread.join(); // wait for thread
|
|
||||||
return -1;
|
return -1;
|
||||||
} else if (bytes_received == 0) {
|
} else if (bytes_received == 0) {
|
||||||
std::cout << "got 0 bytes?\n";
|
std::cout << "got 0 bytes?\n";
|
||||||
@ -762,31 +627,9 @@ int main(void) {
|
|||||||
// apply changes (some) and gen vim inserts
|
// apply changes (some) and gen vim inserts
|
||||||
std::cout << "got fetch changes\n";
|
std::cout << "got fetch changes\n";
|
||||||
|
|
||||||
bool changes {false};
|
|
||||||
{ // apply changes
|
|
||||||
// TODO: make less locky, we dont need to hold both at the same time etc
|
|
||||||
//std::lock_guard lg {ctx.staging_mutex};
|
|
||||||
std::scoped_lock sl {ctx.staging_mutex, ctx.command_lists_mutex};
|
|
||||||
|
|
||||||
// TODO: limit number of ops for responsiveness
|
|
||||||
for (const auto& [agent, staging_seq] : ctx.staging_frontier) {
|
|
||||||
// check if remote newer
|
|
||||||
if (!ctx.command_frontier.count(agent) || ctx.command_frontier.at(agent) < staging_seq) {
|
|
||||||
for (uint64_t seq = ctx.command_frontier[agent]; seq <= staging_seq; seq++) {
|
|
||||||
|
|
||||||
// !! this can get expensive, while we are holding locks :/
|
|
||||||
bool r = doc.apply(ctx.command_lists.at(agent).at(seq).ops);
|
|
||||||
assert(r && "this should not happen");
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.command_frontier[agent] = staging_seq;
|
|
||||||
changes = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto j_res_line_list = nlohmann::json::array();
|
auto j_res_line_list = nlohmann::json::array();
|
||||||
if (changes) { // external changes
|
|
||||||
|
if (true) { // external changes
|
||||||
const auto crdt_text = doc.getText();
|
const auto crdt_text = doc.getText();
|
||||||
std::string_view text_view {crdt_text};
|
std::string_view text_view {crdt_text};
|
||||||
for (int64_t i = 1; ; i++) {
|
for (int64_t i = 1; ; i++) {
|
||||||
@ -849,26 +692,6 @@ int main(void) {
|
|||||||
std::cout << "\n";
|
std::cout << "\n";
|
||||||
}
|
}
|
||||||
assert(doc.getText() == new_text);
|
assert(doc.getText() == new_text);
|
||||||
|
|
||||||
// TODO: make something less locky
|
|
||||||
// TODO: make method
|
|
||||||
{
|
|
||||||
std::lock_guard mg{ctx.command_lists_mutex};
|
|
||||||
assert(ctx.command_lists.size() == ctx.command_frontier.size());
|
|
||||||
|
|
||||||
auto& local_command_list = ctx.command_lists[ctx.agent];
|
|
||||||
uint64_t seq {0};
|
|
||||||
if (ctx.command_frontier.count(ctx.agent)) { // get last own seq
|
|
||||||
seq = ctx.command_frontier[ctx.agent] + 1;
|
|
||||||
}
|
|
||||||
local_command_list.emplace(seq, Command{
|
|
||||||
ctx.agent,
|
|
||||||
seq,
|
|
||||||
ops
|
|
||||||
});
|
|
||||||
ctx.command_frontier[ctx.agent] = seq;
|
|
||||||
}
|
|
||||||
ctx.should_gossip_local.store(true);
|
|
||||||
} else {
|
} else {
|
||||||
std::cout << "unknown command '" << command << "'\n";
|
std::cout << "unknown command '" << command << "'\n";
|
||||||
}
|
}
|
||||||
@ -897,66 +720,14 @@ static void self_connection_status_cb(Tox*, TOX_CONNECTION connection_status, vo
|
|||||||
std::cout << "self_connection_status_cb " << connection_status << "\n";
|
std::cout << "self_connection_status_cb " << connection_status << "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
static void handle_pkg(SharedContext& ctx, const uint8_t* data, size_t length) {
|
|
||||||
if (length < 2) {
|
|
||||||
std::cerr << "got too short pkg " << length << "\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pkg::PKGID pkg_id = static_cast<pkg::PKGID>(data[0]);
|
|
||||||
const auto p_j = nlohmann::json::from_msgpack(data+1, data+1 + (length-1), true, false);
|
|
||||||
if (p_j.is_discarded()) {
|
|
||||||
std::cerr << "got invalid msgpack for " << pkg_id << "\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::cout << "pkg " << pkg_id << " j:" << p_j.dump() << "\n";
|
|
||||||
|
|
||||||
switch (pkg_id) {
|
|
||||||
case pkg::PKGID::FRONTIER: {
|
|
||||||
pkg::Frontier pkg = p_j;
|
|
||||||
|
|
||||||
if (!ctx.heard_gossip.count(pkg.agent) || ctx.heard_gossip[pkg.agent] < pkg.seq) {
|
|
||||||
ctx.heard_gossip[pkg.agent] = pkg.seq;
|
|
||||||
std::cout << "new seq " << pkg.seq << " from " << pkg.agent << "\n";
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case pkg::PKGID::REQUEST_FRONTIER: {
|
|
||||||
pkg::RequestFrontier pkg = p_j;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case pkg::PKGID::COMMAND: {
|
|
||||||
pkg::Command pkg = p_j;
|
|
||||||
|
|
||||||
// push to buffer, if not in buffer
|
|
||||||
if (!ctx.buffer[pkg.agent].count(pkg.seq)) {
|
|
||||||
ctx.buffer[pkg.agent].emplace(pkg.seq, pkg);
|
|
||||||
std::cout << "pushed to buffer " << pkg.seq << " from " << pkg.agent << "\n";
|
|
||||||
}
|
|
||||||
// TODO: notify something?
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case pkg::PKGID::REQUEST_COMMANDS: {
|
|
||||||
pkg::RequestCommands pkg = p_j;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
std::cerr << "unknown pkg id " << pkg_id << "\n";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) {
|
static void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) {
|
||||||
std::cout << "group_custom_packet_cb\n";
|
std::cout << "group_custom_packet_cb\n";
|
||||||
SharedContext& ctx = *static_cast<SharedContext*>(user_data);
|
SharedContext& ctx = *static_cast<SharedContext*>(user_data);
|
||||||
handle_pkg(ctx, data, length);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void group_custom_private_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) {
|
static void group_custom_private_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* data, size_t length, void* user_data) {
|
||||||
std::cout << "group_custom_private_packet_cb\n";
|
std::cout << "group_custom_private_packet_cb\n";
|
||||||
SharedContext& ctx = *static_cast<SharedContext*>(user_data);
|
SharedContext& ctx = *static_cast<SharedContext*>(user_data);
|
||||||
handle_pkg(ctx, data, length);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user