low level toxsync stuff all wired up, now only the highlevel sync logic is missing
This commit is contained in:
parent
45a3915985
commit
619ac3ad16
@ -1,6 +1,7 @@
|
||||
#include <solanaceae/plugin/solana_plugin_v1.h>
|
||||
|
||||
#include <solanaceae/crdtnotes/crdtnotes.hpp>
|
||||
#include <solanaceae/crdtnotes/crdtnotes_sync.hpp>
|
||||
//#include <solanaceae/util/config_model.hpp>
|
||||
|
||||
#include <memory>
|
||||
@ -10,6 +11,7 @@
|
||||
#define PROVIDE_INSTANCE(x, p, v) solana_api->provideInstance(#x, p, static_cast<x*>(v))
|
||||
|
||||
static std::unique_ptr<CRDTNotes> g_crdtn = nullptr;
|
||||
static std::unique_ptr<CRDTNotesSync> g_crdtns = nullptr;
|
||||
|
||||
extern "C" {
|
||||
|
||||
@ -42,10 +44,14 @@ SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api)
|
||||
// static store, could be anywhere tho
|
||||
// construct with fetched dependencies
|
||||
g_crdtn = std::make_unique<CRDTNotes>(/**conf*/);
|
||||
g_crdtns = std::make_unique<CRDTNotesSync>(/**conf*/);
|
||||
|
||||
// register types
|
||||
PROVIDE_INSTANCE(CRDTNotes, "CRDTNotes", g_crdtn.get());
|
||||
|
||||
PROVIDE_INSTANCE(CRDTNotesSync, "CRDTNotes", g_crdtns.get());
|
||||
PROVIDE_INSTANCE(CRDTNotesEventI, "CRDTNotes", g_crdtns.get());
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1,11 +1,13 @@
|
||||
#include <solanaceae/plugin/solana_plugin_v1.h>
|
||||
|
||||
#include <solanaceae/crdtnotes/crdtnotes.hpp>
|
||||
#include <solanaceae/crdtnotes/crdtnotes_sync.hpp>
|
||||
#include <solanaceae/crdtnotes_toxsync/crdtnotes_toxsync.hpp>
|
||||
//#include <solanaceae/util/config_model.hpp>
|
||||
#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/toxcore/tox_interface.hpp>
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
@ -34,16 +36,20 @@ SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api)
|
||||
|
||||
//ConfigModelI* conf = nullptr;
|
||||
CRDTNotes* notes = nullptr;
|
||||
CRDTNotesEventI* notes_sync = nullptr;
|
||||
Contact3Registry* cr = nullptr;
|
||||
ToxI* t = nullptr;
|
||||
ToxEventProviderI* tep = nullptr;
|
||||
ToxContactModel2* tcm = nullptr;
|
||||
|
||||
{ // make sure required types are loaded
|
||||
//conf = RESOLVE_INSTANCE(ConfigModelI);
|
||||
notes = RESOLVE_INSTANCE(CRDTNotes);
|
||||
notes_sync = RESOLVE_INSTANCE(CRDTNotesEventI);
|
||||
cr = RESOLVE_INSTANCE(Contact3Registry);
|
||||
t = RESOLVE_INSTANCE(ToxI);
|
||||
tep = RESOLVE_INSTANCE(ToxEventProviderI);
|
||||
tcm = RESOLVE_INSTANCE(ToxContactModel2);
|
||||
|
||||
//if (conf == nullptr) {
|
||||
//std::cerr << "PLUGIN CRDTN missing ConfigModelI\n";
|
||||
@ -55,6 +61,11 @@ SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api)
|
||||
return 2;
|
||||
}
|
||||
|
||||
if (notes_sync == nullptr) {
|
||||
std::cerr << "PLUGIN CRDTNTS missing CRDTNotesEventI\n";
|
||||
return 2;
|
||||
}
|
||||
|
||||
if (cr == nullptr) {
|
||||
std::cerr << "PLUGIN CRDTNTS missing Contact3Registry\n";
|
||||
return 2;
|
||||
@ -69,11 +80,16 @@ SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api)
|
||||
std::cerr << "PLUGIN CRDTNTS missing ToxEventProviderI\n";
|
||||
return 2;
|
||||
}
|
||||
|
||||
if (tcm == nullptr) {
|
||||
std::cerr << "PLUGIN CRDTNTS missing ToxContactModel2\n";
|
||||
return 2;
|
||||
}
|
||||
}
|
||||
|
||||
// static store, could be anywhere tho
|
||||
// construct with fetched dependencies
|
||||
g_crdtn_ts = std::make_unique<CRDTNotesToxSync>(*notes, *cr, *t, *tep);
|
||||
g_crdtn_ts = std::make_unique<CRDTNotesToxSync>(*notes, *notes_sync, *cr, *t, *tep, *tcm);
|
||||
|
||||
// register types
|
||||
PROVIDE_INSTANCE(CRDTNotesToxSync, "CRDTNotesToxSync", g_crdtn_ts.get());
|
||||
|
@ -4,6 +4,8 @@ add_library(solanaceae_crdtnotes
|
||||
./solanaceae/crdtnotes/crdtnotes.hpp
|
||||
./solanaceae/crdtnotes/crdtnotes.cpp
|
||||
./solanaceae/crdtnotes/crdtnotes_contact_sync_model.hpp
|
||||
./solanaceae/crdtnotes/crdtnotes_sync.hpp
|
||||
./solanaceae/crdtnotes/crdtnotes_sync.cpp
|
||||
)
|
||||
target_include_directories(solanaceae_crdtnotes PUBLIC .)
|
||||
target_compile_features(solanaceae_crdtnotes PUBLIC cxx_std_17)
|
||||
|
@ -47,7 +47,6 @@ class CRDTNotes {
|
||||
std::default_random_engine _rng;
|
||||
|
||||
public:
|
||||
|
||||
// config?
|
||||
CRDTNotes(void);
|
||||
~CRDTNotes(void);
|
||||
|
17
src/solanaceae/crdtnotes/crdtnotes_sync.cpp
Normal file
17
src/solanaceae/crdtnotes/crdtnotes_sync.cpp
Normal file
@ -0,0 +1,17 @@
|
||||
#include "./crdtnotes_sync.hpp"
|
||||
|
||||
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) {
|
||||
}
|
||||
|
||||
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip_frontier&& e) {
|
||||
}
|
||||
|
||||
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) {
|
||||
}
|
||||
|
||||
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) {
|
||||
}
|
||||
|
||||
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) {
|
||||
}
|
||||
|
78
src/solanaceae/crdtnotes/crdtnotes_sync.hpp
Normal file
78
src/solanaceae/crdtnotes/crdtnotes_sync.hpp
Normal file
@ -0,0 +1,78 @@
|
||||
#pragma once
|
||||
|
||||
#include "./crdtnotes.hpp"
|
||||
#include <solanaceae/contact/contact_model3.hpp>
|
||||
|
||||
namespace Events {
|
||||
|
||||
// - DocID
|
||||
struct NGCEXT_crdtns_gossip {
|
||||
Contact3Handle c;
|
||||
CRDTNotes::DocID doc_id;
|
||||
};
|
||||
|
||||
// - DocID
|
||||
// - array [
|
||||
// - AgentID
|
||||
// - seq (frontier)
|
||||
// - ]
|
||||
struct NGCEXT_crdtns_gossip_frontier {
|
||||
Contact3Handle c;
|
||||
CRDTNotes::DocID doc_id;
|
||||
std::vector<CRDTNotes::Frontier> selected_frontier;
|
||||
};
|
||||
|
||||
// - DocID
|
||||
struct NGCEXT_crdtns_fetch_complete_frontier {
|
||||
Contact3Handle c;
|
||||
CRDTNotes::DocID doc_id;
|
||||
};
|
||||
|
||||
// - DocID
|
||||
// - AgentID
|
||||
// - seq_from
|
||||
// - seq_to
|
||||
struct NGCEXT_crdtns_fetch_op_range {
|
||||
Contact3Handle c;
|
||||
CRDTNotes::DocID doc_id;
|
||||
CRDTNotes::CRDTAgent agent;
|
||||
uint64_t seq_from;
|
||||
uint64_t seq_to;
|
||||
};
|
||||
|
||||
// - DocID
|
||||
// - array [
|
||||
// - op
|
||||
// - ]
|
||||
struct NGCEXT_crdtns_ops {
|
||||
Contact3Handle c;
|
||||
CRDTNotes::DocID doc_id;
|
||||
std::vector<CRDTNotes::Doc::Op> ops;
|
||||
};
|
||||
|
||||
} // Events
|
||||
|
||||
// this is different than other "i might not handle this" event interfaces
|
||||
struct CRDTNotesEventI {
|
||||
virtual ~CRDTNotesEventI(void) {}
|
||||
|
||||
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) = 0;
|
||||
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip_frontier&& e) = 0;
|
||||
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) = 0;
|
||||
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) = 0;
|
||||
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) = 0;
|
||||
};
|
||||
|
||||
// high level sync logic
|
||||
// gets called on incoming packets
|
||||
// calls CRDTNotesContactSyncModelI on contacts
|
||||
class CRDTNotesSync final : public CRDTNotesEventI {
|
||||
|
||||
public:
|
||||
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) override;
|
||||
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip_frontier&& e) override;
|
||||
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) override;
|
||||
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) override;
|
||||
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) override;
|
||||
};
|
||||
|
@ -4,6 +4,9 @@
|
||||
|
||||
#include <solanaceae/tox_contacts/components.hpp>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
// TODO: really need a good way to manage/negotiate packet ids
|
||||
enum class NGCEXT_Event : uint8_t {
|
||||
// - DocID
|
||||
CRDTN_GOSSIP = 0x80 | 0x10,
|
||||
@ -32,19 +35,39 @@ enum class NGCEXT_Event : uint8_t {
|
||||
CRDTN_OPS,
|
||||
};
|
||||
|
||||
#define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error }
|
||||
|
||||
CRDTNotesToxSync::CRDTNotesToxSync(
|
||||
CRDTNotes& notes,
|
||||
CRDTNotesEventI& notes_sync,
|
||||
Contact3Registry& cr,
|
||||
ToxI& t,
|
||||
ToxEventProviderI& tep
|
||||
) : _notes(notes), _cr(cr), _t(t), _tep(tep) {
|
||||
ToxEventProviderI& tep,
|
||||
ToxContactModel2& tcm
|
||||
) : _notes(notes), _notes_sync(notes_sync), _cr(cr), _t(t), _tep(tep), _tcm(tcm) {
|
||||
// TODO: non groups
|
||||
|
||||
// should be called for every peer (except self)
|
||||
// we hook here to inject ourself as contact sync model
|
||||
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_JOIN);
|
||||
|
||||
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PACKET);
|
||||
_tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET);
|
||||
}
|
||||
|
||||
CRDTNotesToxSync::~CRDTNotesToxSync(void) {
|
||||
// TODO: find a better way to remove dangling pointers
|
||||
std::vector<Contact3> to_remove_self;
|
||||
_cr.view<CRDTNotesContactSyncModelI*>().each([&to_remove_self, this](Contact3 c, const auto* csm) {
|
||||
if (this == csm) {
|
||||
to_remove_self.push_back(c);
|
||||
}
|
||||
});
|
||||
_cr.remove<CRDTNotesContactSyncModelI*>(to_remove_self.cbegin(), to_remove_self.cend());
|
||||
}
|
||||
|
||||
float CRDTNotesToxSync::iterate(float time_delta) {
|
||||
// TODO: do i actually need this?, logic is somewhere else and this is reactive only
|
||||
return 1.f; // TODO: 1sec for now, needs better logic
|
||||
}
|
||||
|
||||
@ -263,3 +286,293 @@ void CRDTNotesToxSync::SendOps(
|
||||
);
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::parse_crdtn_gossip(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool // dont care private
|
||||
) {
|
||||
Events::NGCEXT_crdtns_gossip e;
|
||||
e.c = c;
|
||||
|
||||
size_t curser = 0;
|
||||
|
||||
_DATA_HAVE(e.doc_id.size() * sizeof(decltype(e.doc_id)::value_type), std::cerr << "NGCEXT: packet too small, missing doc_id\n"; return false;)
|
||||
for (size_t i = 0; i < e.doc_id.size(); i++, curser++) {
|
||||
e.doc_id[i] = data[curser];
|
||||
}
|
||||
|
||||
//return dispatch(
|
||||
//NGCEXT_Event::FT1_REQUEST,
|
||||
//e
|
||||
//);
|
||||
|
||||
std::cout << "CRDTN gossip parsed\n";
|
||||
_notes_sync.onCRDTNSyncEvent(std::move(e));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::parse_crdtn_gossip_frontier(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool // dont care private
|
||||
) {
|
||||
Events::NGCEXT_crdtns_gossip_frontier e;
|
||||
e.c = c;
|
||||
|
||||
size_t curser = 0;
|
||||
|
||||
_DATA_HAVE(e.doc_id.size() * sizeof(decltype(e.doc_id)::value_type), std::cerr << "NGCEXT: packet too small, missing doc_id\n"; return false;)
|
||||
for (size_t i = 0; i < e.doc_id.size(); i++, curser++) {
|
||||
e.doc_id[i] = data[curser];
|
||||
}
|
||||
|
||||
// expect remaining size to be a multiple of agentid+seq
|
||||
|
||||
while (curser < data_size) {
|
||||
CRDTNotes::Frontier new_f;
|
||||
_DATA_HAVE(new_f.agent.size() * sizeof(CRDTNotes::CRDTAgent::value_type) + sizeof(new_f.seq), std::cerr << "NGCEXT: packet malformed, not enough data for forntier\n"; return false;)
|
||||
|
||||
for (size_t i = 0; i < new_f.agent.size(); i++, curser++) {
|
||||
new_f.agent[i] = data[curser];
|
||||
}
|
||||
|
||||
new_f.seq = 0;
|
||||
for (size_t i = 0; i < sizeof(new_f.seq); i++, curser++) {
|
||||
new_f.seq |= uint64_t(data[curser]) << i*8;
|
||||
}
|
||||
|
||||
e.selected_frontier.emplace_back(std::move(new_f));
|
||||
}
|
||||
|
||||
std::cout << "CRDTN gossip_frontier parsed\n";
|
||||
_notes_sync.onCRDTNSyncEvent(std::move(e));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::parse_crdtn_fetch_complete_frontier(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool // dont care private
|
||||
) {
|
||||
Events::NGCEXT_crdtns_fetch_complete_frontier e;
|
||||
e.c = c;
|
||||
|
||||
size_t curser = 0;
|
||||
|
||||
_DATA_HAVE(e.doc_id.size() * sizeof(decltype(e.doc_id)::value_type), std::cerr << "NGCEXT: packet too small, missing doc_id\n"; return false;)
|
||||
for (size_t i = 0; i < e.doc_id.size(); i++, curser++) {
|
||||
e.doc_id[i] = data[curser];
|
||||
}
|
||||
|
||||
std::cout << "CRDTN fetch_complete_frontier parsed\n";
|
||||
_notes_sync.onCRDTNSyncEvent(std::move(e));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::parse_crdtn_fetch_op_range(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool // dont care private
|
||||
) {
|
||||
Events::NGCEXT_crdtns_fetch_op_range e;
|
||||
e.c = c;
|
||||
|
||||
size_t curser = 0;
|
||||
|
||||
_DATA_HAVE(e.doc_id.size() * sizeof(decltype(e.doc_id)::value_type), std::cerr << "NGCEXT: packet too small, missing doc_id\n"; return false;)
|
||||
for (size_t i = 0; i < e.doc_id.size(); i++, curser++) {
|
||||
e.doc_id[i] = data[curser];
|
||||
}
|
||||
|
||||
_DATA_HAVE(e.agent.size() * sizeof(decltype(e.agent)::value_type), std::cerr << "NGCEXT: packet too small, missing agent\n"; return false;)
|
||||
for (size_t i = 0; i < e.agent.size(); i++, curser++) {
|
||||
e.agent[i] = data[curser];
|
||||
}
|
||||
|
||||
_DATA_HAVE(sizeof(e.seq_from), std::cerr << "NGCEXT: packet too small, missing seq_from\n"; return false;)
|
||||
e.seq_from = 0;
|
||||
for (size_t i = 0; i < sizeof(e.seq_from); i++, curser++) {
|
||||
e.seq_from |= uint64_t(data[curser]) << i*8;
|
||||
}
|
||||
|
||||
_DATA_HAVE(sizeof(e.seq_to), std::cerr << "NGCEXT: packet too small, missing seq_to\n"; return false;)
|
||||
e.seq_to = 0;
|
||||
for (size_t i = 0; i < sizeof(e.seq_to); i++, curser++) {
|
||||
e.seq_to |= uint64_t(data[curser]) << i*8;
|
||||
}
|
||||
|
||||
std::cout << "CRDTN fetch_op_range parsed\n";
|
||||
_notes_sync.onCRDTNSyncEvent(std::move(e));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::parse_crdtn_ops(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool // dont care private
|
||||
) {
|
||||
Events::NGCEXT_crdtns_ops e;
|
||||
e.c = c;
|
||||
|
||||
size_t curser = 0;
|
||||
|
||||
_DATA_HAVE(e.doc_id.size() * sizeof(decltype(e.doc_id)::value_type), std::cerr << "NGCEXT: packet too small, missing doc_id\n"; return false;)
|
||||
for (size_t i = 0; i < e.doc_id.size(); i++, curser++) {
|
||||
e.doc_id[i] = data[curser];
|
||||
}
|
||||
|
||||
// an op is atleast 1+32+8 (41) (del)
|
||||
while (curser < data_size) {
|
||||
_DATA_HAVE(1, std::cerr << "NGCEXT: packet too small, missing optype\n"; return false;)
|
||||
const uint8_t op_type = data[curser++];
|
||||
if (op_type == 0x00) { // op add
|
||||
CRDTNotes::Doc::OpAdd new_op;
|
||||
|
||||
_DATA_HAVE(new_op.id.id.size() * sizeof(decltype(new_op.id.id)::value_type), std::cerr << "NGCEXT: packet malformed, agent for op to small\n"; return false;)
|
||||
for (size_t i = 0; i < new_op.id.id.size(); i++, curser++) {
|
||||
new_op.id.id[i] = data[curser];
|
||||
}
|
||||
|
||||
_DATA_HAVE(sizeof(new_op.id.seq), std::cerr << "NGCEXT: packet malformed, missing seq for op\n"; return false;)
|
||||
new_op.id.seq = 0;
|
||||
for (size_t i = 0; i < sizeof(new_op.id.seq); i++, curser++) {
|
||||
new_op.id.seq |= uint64_t(data[curser]) << i*8;
|
||||
}
|
||||
|
||||
_DATA_HAVE(sizeof(new_op.value), std::cerr << "NGCEXT: packet malformed, missing value for op\n"; return false;)
|
||||
static_assert(sizeof(new_op.value) == 1);
|
||||
new_op.value = data[curser++];
|
||||
|
||||
_DATA_HAVE(1, std::cerr << "NGCEXT: packet too small, missing has_parent_left\n"; return false;)
|
||||
const uint8_t has_parent_left = data[curser++];
|
||||
if (has_parent_left == 0x01) { // TODO: test for other values
|
||||
new_op.parent_left.emplace();
|
||||
_DATA_HAVE(new_op.parent_left.value().id.size() * sizeof(decltype(new_op.parent_left.value().id)::value_type), std::cerr << "NGCEXT: packet malformed, agent for parent left for op to small\n"; return false;)
|
||||
for (size_t i = 0; i < new_op.parent_left.value().id.size(); i++, curser++) {
|
||||
new_op.parent_left.value().id[i] = data[curser];
|
||||
}
|
||||
|
||||
_DATA_HAVE(sizeof(new_op.parent_left.value().seq), std::cerr << "NGCEXT: packet malformed, missing seq for parent_left for op\n"; return false;)
|
||||
new_op.parent_left.value().seq = 0;
|
||||
for (size_t i = 0; i < sizeof(new_op.parent_left.value().seq); i++, curser++) {
|
||||
new_op.parent_left.value().seq |= uint64_t(data[curser]) << i*8;
|
||||
}
|
||||
}
|
||||
|
||||
_DATA_HAVE(1, std::cerr << "NGCEXT: packet too small, missing has_parent_right\n"; return false;)
|
||||
const uint8_t has_parent_right = data[curser++];
|
||||
if (has_parent_right == 0x01) { // TODO: test for other values
|
||||
new_op.parent_right.emplace();
|
||||
_DATA_HAVE(new_op.parent_right.value().id.size() * sizeof(decltype(new_op.parent_right.value().id)::value_type), std::cerr << "NGCEXT: packet malformed, agent for parent_right for op to small\n"; return false;)
|
||||
for (size_t i = 0; i < new_op.parent_right.value().id.size(); i++, curser++) {
|
||||
new_op.parent_right.value().id[i] = data[curser];
|
||||
}
|
||||
|
||||
_DATA_HAVE(sizeof(new_op.parent_right.value().seq), std::cerr << "NGCEXT: packet malformed, missing seq for parent_right for op\n"; return false;)
|
||||
new_op.parent_right.value().seq = 0;
|
||||
for (size_t i = 0; i < sizeof(new_op.parent_right.value().seq); i++, curser++) {
|
||||
new_op.parent_right.value().seq |= uint64_t(data[curser]) << i*8;
|
||||
}
|
||||
}
|
||||
|
||||
e.ops.emplace_back(std::move(new_op));
|
||||
} else if (op_type == 0x01) { // op del
|
||||
CRDTNotes::Doc::OpDel new_op;
|
||||
|
||||
_DATA_HAVE(new_op.id.id.size() * sizeof(decltype(new_op.id.id)::value_type), std::cerr << "NGCEXT: packet malformed, agent for op to small\n"; return false;)
|
||||
for (size_t i = 0; i < new_op.id.id.size(); i++, curser++) {
|
||||
new_op.id.id[i] = data[curser];
|
||||
}
|
||||
|
||||
_DATA_HAVE(sizeof(new_op.id.seq), std::cerr << "NGCEXT: packet malformed, missing seq for op\n"; return false;)
|
||||
new_op.id.seq = 0;
|
||||
for (size_t i = 0; i < sizeof(new_op.id.seq); i++, curser++) {
|
||||
new_op.id.seq |= uint64_t(data[curser]) << i*8;
|
||||
}
|
||||
|
||||
e.ops.emplace_back(std::move(new_op));
|
||||
} else {
|
||||
std::cerr << "NGCEXT: packet malformed, unknown optype " << (int)op_type << "\n";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "CRDTN ops parsed (count:" << e.ops.size() << ")\n";
|
||||
_notes_sync.onCRDTNSyncEvent(std::move(e));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::handlePacket(
|
||||
const uint32_t group_number,
|
||||
const uint32_t peer_number,
|
||||
const uint8_t* data,
|
||||
const size_t data_size,
|
||||
const bool _private
|
||||
) {
|
||||
if (data_size < 1) {
|
||||
return false; // waht
|
||||
}
|
||||
|
||||
// tcm id to c
|
||||
auto c = _tcm.getContactGroupPeer(group_number, peer_number);
|
||||
if (!c.valid()) {
|
||||
// HUH?
|
||||
return false;
|
||||
}
|
||||
|
||||
NGCEXT_Event pkg_type = static_cast<NGCEXT_Event>(data[0]);
|
||||
|
||||
switch (pkg_type) {
|
||||
case NGCEXT_Event::CRDTN_GOSSIP:
|
||||
return parse_crdtn_gossip(c, data+1, data_size-1, _private);
|
||||
case NGCEXT_Event::CRDTN_GOSSIP_FRONTIER:
|
||||
return parse_crdtn_gossip_frontier(c, data+1, data_size-1, _private);
|
||||
case NGCEXT_Event::CRDTN_FETCH_COMPLETE_FRONTIER:
|
||||
return parse_crdtn_fetch_complete_frontier(c, data+1, data_size-1, _private);
|
||||
case NGCEXT_Event::CRDTN_FETCH_OP_RANGE:
|
||||
return parse_crdtn_fetch_op_range(c, data+1, data_size-1, _private);
|
||||
case NGCEXT_Event::CRDTN_OPS:
|
||||
return parse_crdtn_ops(c, data+1, data_size-1, _private);
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
|
||||
// TODO: replace with actual contact events
|
||||
const auto group_number = tox_event_group_peer_join_get_group_number(e);
|
||||
const auto peer_number = tox_event_group_peer_join_get_peer_id(e);
|
||||
|
||||
// tcm id to c
|
||||
auto c = _tcm.getContactGroupPeer(group_number, peer_number);
|
||||
if (!c.valid()) {
|
||||
// HUH?
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: find a better way
|
||||
c.emplace_or_replace<CRDTNotesContactSyncModelI*>(this);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::onToxEvent(const Tox_Event_Group_Custom_Packet* e) {
|
||||
const auto group_number = tox_event_group_custom_packet_get_group_number(e);
|
||||
const auto peer_number = tox_event_group_custom_packet_get_peer_id(e);
|
||||
const uint8_t* data = tox_event_group_custom_packet_get_data(e);
|
||||
const auto data_length = tox_event_group_custom_packet_get_data_length(e);
|
||||
return handlePacket(group_number, peer_number, data, data_length, false);
|
||||
}
|
||||
|
||||
bool CRDTNotesToxSync::onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) {
|
||||
const auto group_number = tox_event_group_custom_private_packet_get_group_number(e);
|
||||
const auto peer_number = tox_event_group_custom_private_packet_get_peer_id(e);
|
||||
const uint8_t* data = tox_event_group_custom_private_packet_get_data(e);
|
||||
const auto data_length = tox_event_group_custom_private_packet_get_data_length(e);
|
||||
return handlePacket(group_number, peer_number, data, data_length, true);
|
||||
}
|
||||
|
||||
#undef _DATA_HAVE
|
||||
|
||||
|
@ -1,9 +1,11 @@
|
||||
#pragma once
|
||||
|
||||
#include "solanaceae/crdtnotes/crdtnotes_sync.hpp"
|
||||
#include <solanaceae/crdtnotes/crdtnotes.hpp>
|
||||
#include <solanaceae/crdtnotes/crdtnotes_contact_sync_model.hpp>
|
||||
#include <solanaceae/contact/contact_model3.hpp>
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
|
||||
// fwd
|
||||
struct ToxI;
|
||||
@ -12,16 +14,20 @@ struct ToxEventProviderI;
|
||||
// implements CRDTNotesContactSyncModelI and attaches itself to tox contacts
|
||||
class CRDTNotesToxSync : public CRDTNotesContactSyncModelI, public ToxEventI {
|
||||
CRDTNotes& _notes;
|
||||
CRDTNotesEventI& _notes_sync;
|
||||
Contact3Registry& _cr;
|
||||
ToxI& _t;
|
||||
ToxEventProviderI& _tep;
|
||||
ToxContactModel2& _tcm;
|
||||
|
||||
public:
|
||||
CRDTNotesToxSync(
|
||||
CRDTNotes& notes,
|
||||
CRDTNotesEventI& notes_sync,
|
||||
Contact3Registry& cr,
|
||||
ToxI& t,
|
||||
ToxEventProviderI& tep
|
||||
ToxEventProviderI& tep,
|
||||
ToxContactModel2& tcm
|
||||
);
|
||||
~CRDTNotesToxSync(void);
|
||||
|
||||
@ -57,5 +63,45 @@ class CRDTNotesToxSync : public CRDTNotesContactSyncModelI, public ToxEventI {
|
||||
const CRDTNotes::DocID& doc_id,
|
||||
const std::vector<CRDTNotes::Doc::Op>&
|
||||
) override;
|
||||
|
||||
private:
|
||||
bool parse_crdtn_gossip(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool _private
|
||||
);
|
||||
bool parse_crdtn_gossip_frontier(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool _private
|
||||
);
|
||||
bool parse_crdtn_fetch_complete_frontier(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool _private
|
||||
);
|
||||
bool parse_crdtn_fetch_op_range(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool _private
|
||||
);
|
||||
bool parse_crdtn_ops(
|
||||
Contact3Handle c,
|
||||
const uint8_t* data, size_t data_size,
|
||||
bool _private
|
||||
);
|
||||
|
||||
bool handlePacket(
|
||||
const uint32_t group_number,
|
||||
const uint32_t peer_number,
|
||||
const uint8_t* data,
|
||||
const size_t data_size,
|
||||
const bool _private
|
||||
);
|
||||
|
||||
protected: // tox events
|
||||
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
|
||||
bool onToxEvent(const Tox_Event_Group_Custom_Packet* e) override;
|
||||
bool onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) override;
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user