Compare commits

...

1 Commits

Author SHA1 Message Date
Green Sky
f41bfeaa65 rename fetch ops to fetch add
add delnum to frontier and fetch del
2025-09-08 10:41:03 +02:00
6 changed files with 127 additions and 23 deletions

View File

@@ -37,6 +37,7 @@ class CRDTNotes {
struct Frontier { // newest known seq for given agent struct Frontier { // newest known seq for given agent
CRDTAgent agent; CRDTAgent agent;
uint64_t seq{0}; uint64_t seq{0};
uint64_t del_num{0};
}; };
// RAII lock wrapper // RAII lock wrapper

View File

@@ -19,7 +19,7 @@ struct CRDTNotesContactSyncModelI {
virtual void SendGossip( virtual void SendGossip(
ContactHandle4 c, ContactHandle4 c,
const CRDTNotes::DocID& doc_id, const CRDTNotes::DocID& doc_id,
const std::vector<CRDTNotes::Frontier>& selected_frontier const std::vector<CRDTNotes::Frontier>& frontier
) = 0; ) = 0;
// fetch // fetch
@@ -31,7 +31,7 @@ struct CRDTNotesContactSyncModelI {
) = 0; ) = 0;
// action range request // action range request
virtual void SendFetchOps( virtual void SendFetchAddRange(
ContactHandle4 c, ContactHandle4 c,
const CRDTNotes::DocID& doc_id, const CRDTNotes::DocID& doc_id,
const CRDTNotes::CRDTAgent& agent, const CRDTNotes::CRDTAgent& agent,
@@ -39,6 +39,12 @@ struct CRDTNotesContactSyncModelI {
const uint64_t seq_to const uint64_t seq_to
) = 0; ) = 0;
virtual void SendFetchDel(
ContactHandle4 c,
const CRDTNotes::DocID& doc_id,
const CRDTNotes::CRDTAgent& agent
) = 0;
public: // ops response public: // ops response
virtual void SendOps( virtual void SendOps(
ContactHandle4 c, ContactHandle4 c,

View File

@@ -324,7 +324,10 @@ void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip_frontier&& e)
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) { void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) {
} }
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) { void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_add_range&& e) {
}
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_del&& e) {
} }
void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) { void CRDTNotesSync::onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) {

View File

@@ -27,11 +27,12 @@ namespace Events {
// - array [ // - array [
// - AgentID // - AgentID
// - seq (frontier) // - seq (frontier)
// - del_num
// - ] // - ]
struct NGCEXT_crdtns_gossip_frontier { struct NGCEXT_crdtns_gossip_frontier {
ContactHandle4 c; ContactHandle4 c;
CRDTNotes::DocID doc_id; CRDTNotes::DocID doc_id;
std::vector<CRDTNotes::Frontier> selected_frontier; std::vector<CRDTNotes::Frontier> frontier;
}; };
// - DocID // - DocID
@@ -44,7 +45,7 @@ namespace Events {
// - AgentID // - AgentID
// - seq_from // - seq_from
// - seq_to // - seq_to
struct NGCEXT_crdtns_fetch_op_range { struct NGCEXT_crdtns_fetch_add_range {
ContactHandle4 c; ContactHandle4 c;
CRDTNotes::DocID doc_id; CRDTNotes::DocID doc_id;
CRDTNotes::CRDTAgent agent; CRDTNotes::CRDTAgent agent;
@@ -52,6 +53,14 @@ namespace Events {
uint64_t seq_to; uint64_t seq_to;
}; };
// - DocID
// - AgentID
struct NGCEXT_crdtns_fetch_del {
ContactHandle4 c;
CRDTNotes::DocID doc_id;
CRDTNotes::CRDTAgent agent;
};
// - DocID // - DocID
// - array [ // - array [
// - op // - op
@@ -76,7 +85,8 @@ struct CRDTNotesEventI {
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) = 0; virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) = 0;
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip_frontier&& e) = 0; virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip_frontier&& e) = 0;
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) = 0; virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) = 0;
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) = 0; virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_add_range&& e) = 0;
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_del&& e) = 0;
virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) = 0; virtual void onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) = 0;
}; };
@@ -133,7 +143,8 @@ class CRDTNotesSync final : public CRDTNotesEventI {
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) override; void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip&& e) override;
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip_frontier&& e) override; void onCRDTNSyncEvent(Events::NGCEXT_crdtns_gossip_frontier&& e) override;
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) override; void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_complete_frontier&& e) override;
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_op_range&& e) override; void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_add_range&& e) override;
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_fetch_del&& e) override;
void onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) override; void onCRDTNSyncEvent(Events::NGCEXT_crdtns_ops&& e) override;
}; };

View File

@@ -16,6 +16,7 @@ enum class NGCEXT_Event : uint8_t {
// - array [ // - array [
// - AgentID // - AgentID
// - seq (frontier) // - seq (frontier)
// - del_num
// - ] // - ]
CRDTN_GOSSIP_FRONTIER, CRDTN_GOSSIP_FRONTIER,
@@ -26,7 +27,11 @@ enum class NGCEXT_Event : uint8_t {
// - AgentID // - AgentID
// - seq_from // - seq_from
// - seq_to // - seq_to
CRDTN_FETCH_OP_RANGE, CRDTN_FETCH_ADD_RANGE,
// - DocID
// - AgentID
CRDTN_FETCH_DEL,
// - DocID // - DocID
// - array [ // - array [
@@ -101,7 +106,7 @@ void CRDTNotesToxSync::SendGossip(
void CRDTNotesToxSync::SendGossip( void CRDTNotesToxSync::SendGossip(
ContactHandle4 c, ContactHandle4 c,
const CRDTNotes::DocID& doc_id, const CRDTNotes::DocID& doc_id,
const std::vector<CRDTNotes::Frontier>& selected_frontier const std::vector<CRDTNotes::Frontier>& frontier
) { ) {
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) { if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
return; return;
@@ -117,7 +122,7 @@ void CRDTNotesToxSync::SendGossip(
} }
// +32 // +32
for (const auto& [f_id, f_seq] : selected_frontier) { for (const auto& [f_id, f_seq, del_num] : frontier) {
for (const uint8_t v : f_id) { for (const uint8_t v : f_id) {
pkg.push_back(v); pkg.push_back(v);
} }
@@ -127,8 +132,13 @@ void CRDTNotesToxSync::SendGossip(
pkg.push_back((f_seq >> i*8) & 0xff); pkg.push_back((f_seq >> i*8) & 0xff);
} }
// +8 // +8
for (size_t i = 0; i < sizeof(del_num); i++) {
pkg.push_back((del_num >> i*8) & 0xff);
}
// +8
} }
// +40 // +48
// send // send
const auto& gp = c.get<Contact::Components::ToxGroupPeerEphemeral>(); const auto& gp = c.get<Contact::Components::ToxGroupPeerEphemeral>();
@@ -164,7 +174,7 @@ void CRDTNotesToxSync::SendFetchCompleteFrontier(
); );
} }
void CRDTNotesToxSync::SendFetchOps( void CRDTNotesToxSync::SendFetchAddRange(
ContactHandle4 c, ContactHandle4 c,
const CRDTNotes::DocID& doc_id, const CRDTNotes::DocID& doc_id,
const CRDTNotes::CRDTAgent& agent, const CRDTNotes::CRDTAgent& agent,
@@ -177,7 +187,7 @@ void CRDTNotesToxSync::SendFetchOps(
std::vector<uint8_t> pkg; std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::CRDTN_FETCH_OP_RANGE)); pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::CRDTN_FETCH_ADD_RANGE));
for (const uint8_t v : doc_id) { for (const uint8_t v : doc_id) {
pkg.push_back(v); pkg.push_back(v);
@@ -207,6 +217,35 @@ void CRDTNotesToxSync::SendFetchOps(
); );
} }
void CRDTNotesToxSync::SendFetchDel(
ContactHandle4 c,
const CRDTNotes::DocID& doc_id,
const CRDTNotes::CRDTAgent& agent
) {
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
return;
}
std::vector<uint8_t> pkg;
pkg.push_back(static_cast<uint8_t>(NGCEXT_Event::CRDTN_FETCH_DEL));
for (const uint8_t v : doc_id) {
pkg.push_back(v);
}
for (const uint8_t v : agent) {
pkg.push_back(v);
}
const auto& gp = c.get<Contact::Components::ToxGroupPeerEphemeral>();
_t.toxGroupSendCustomPrivatePacket(
gp.group_number, gp.peer_number,
true,
pkg
);
}
void CRDTNotesToxSync::SendOps( void CRDTNotesToxSync::SendOps(
ContactHandle4 c, ContactHandle4 c,
const CRDTNotes::DocID& doc_id, const CRDTNotes::DocID& doc_id,
@@ -228,6 +267,7 @@ void CRDTNotesToxSync::SendOps(
// this is very inefficent // this is very inefficent
// a full add op is 124bytes like this // a full add op is 124bytes like this
// a full del op is 41bytes
for (const auto& op : ops) { for (const auto& op : ops) {
if(std::holds_alternative<CRDTNotes::Doc::OpAdd>(op)) { if(std::holds_alternative<CRDTNotes::Doc::OpAdd>(op)) {
const auto& add_op = std::get<CRDTNotes::Doc::OpAdd>(op); const auto& add_op = std::get<CRDTNotes::Doc::OpAdd>(op);
@@ -332,7 +372,7 @@ bool CRDTNotesToxSync::parse_crdtn_gossip_frontier(
while (curser < data_size) { while (curser < data_size) {
CRDTNotes::Frontier new_f; CRDTNotes::Frontier new_f;
_DATA_HAVE(new_f.agent.size() * sizeof(CRDTNotes::CRDTAgent::value_type) + sizeof(new_f.seq), std::cerr << "NGCEXT: packet malformed, not enough data for forntier\n"; return false;) _DATA_HAVE(new_f.agent.size() * sizeof(CRDTNotes::CRDTAgent::value_type) + sizeof(new_f.seq) + sizeof(new_f.del_num), std::cerr << "NGCEXT: packet malformed, not enough data for frontier\n"; return false;)
for (size_t i = 0; i < new_f.agent.size(); i++, curser++) { for (size_t i = 0; i < new_f.agent.size(); i++, curser++) {
new_f.agent[i] = data[curser]; new_f.agent[i] = data[curser];
@@ -343,7 +383,12 @@ bool CRDTNotesToxSync::parse_crdtn_gossip_frontier(
new_f.seq |= uint64_t(data[curser]) << i*8; new_f.seq |= uint64_t(data[curser]) << i*8;
} }
e.selected_frontier.emplace_back(std::move(new_f)); new_f.del_num = 0;
for (size_t i = 0; i < sizeof(new_f.del_num); i++, curser++) {
new_f.del_num |= uint64_t(data[curser]) << i*8;
}
e.frontier.emplace_back(std::move(new_f));
} }
std::cout << "CRDTN gossip_frontier parsed\n"; std::cout << "CRDTN gossip_frontier parsed\n";
@@ -371,12 +416,12 @@ bool CRDTNotesToxSync::parse_crdtn_fetch_complete_frontier(
return true; return true;
} }
bool CRDTNotesToxSync::parse_crdtn_fetch_op_range( bool CRDTNotesToxSync::parse_crdtn_fetch_add_range(
ContactHandle4 c, ContactHandle4 c,
const uint8_t* data, size_t data_size, const uint8_t* data, size_t data_size,
bool // dont care private bool // dont care private
) { ) {
Events::NGCEXT_crdtns_fetch_op_range e; Events::NGCEXT_crdtns_fetch_add_range e;
e.c = c; e.c = c;
size_t curser = 0; size_t curser = 0;
@@ -403,7 +448,32 @@ bool CRDTNotesToxSync::parse_crdtn_fetch_op_range(
e.seq_to |= uint64_t(data[curser]) << i*8; e.seq_to |= uint64_t(data[curser]) << i*8;
} }
std::cout << "CRDTN fetch_op_range parsed\n"; std::cout << "CRDTN fetch_add_range parsed\n";
_notes_sync.onCRDTNSyncEvent(std::move(e));
return true;
}
bool CRDTNotesToxSync::parse_crdtn_fetch_del(
ContactHandle4 c,
const uint8_t* data, size_t data_size,
bool // dont care private
) {
Events::NGCEXT_crdtns_fetch_del e;
e.c = c;
size_t curser = 0;
_DATA_HAVE(e.doc_id.size() * sizeof(decltype(e.doc_id)::value_type), std::cerr << "NGCEXT: packet too small, missing doc_id\n"; return false;)
for (size_t i = 0; i < e.doc_id.size(); i++, curser++) {
e.doc_id[i] = data[curser];
}
_DATA_HAVE(e.agent.size() * sizeof(decltype(e.agent)::value_type), std::cerr << "NGCEXT: packet too small, missing agent\n"; return false;)
for (size_t i = 0; i < e.agent.size(); i++, curser++) {
e.agent[i] = data[curser];
}
std::cout << "CRDTN fetch_del parsed\n";
_notes_sync.onCRDTNSyncEvent(std::move(e)); _notes_sync.onCRDTNSyncEvent(std::move(e));
return true; return true;
} }
@@ -531,8 +601,10 @@ bool CRDTNotesToxSync::handlePacket(
return parse_crdtn_gossip_frontier(c, data+1, data_size-1, _private); return parse_crdtn_gossip_frontier(c, data+1, data_size-1, _private);
case NGCEXT_Event::CRDTN_FETCH_COMPLETE_FRONTIER: case NGCEXT_Event::CRDTN_FETCH_COMPLETE_FRONTIER:
return parse_crdtn_fetch_complete_frontier(c, data+1, data_size-1, _private); return parse_crdtn_fetch_complete_frontier(c, data+1, data_size-1, _private);
case NGCEXT_Event::CRDTN_FETCH_OP_RANGE: case NGCEXT_Event::CRDTN_FETCH_ADD_RANGE:
return parse_crdtn_fetch_op_range(c, data+1, data_size-1, _private); return parse_crdtn_fetch_add_range(c, data+1, data_size-1, _private);
case NGCEXT_Event::CRDTN_FETCH_DEL:
return parse_crdtn_fetch_del(c, data+1, data_size-1, _private);
case NGCEXT_Event::CRDTN_OPS: case NGCEXT_Event::CRDTN_OPS:
return parse_crdtn_ops(c, data+1, data_size-1, _private); return parse_crdtn_ops(c, data+1, data_size-1, _private);
default: default:

View File

@@ -39,7 +39,7 @@ class CRDTNotesToxSync : public CRDTNotesContactSyncModelI, public ToxEventI {
void SendGossip( void SendGossip(
ContactHandle4 c, ContactHandle4 c,
const CRDTNotes::DocID& doc_id, const CRDTNotes::DocID& doc_id,
const std::vector<CRDTNotes::Frontier>& selected_frontier const std::vector<CRDTNotes::Frontier>& frontier
) override; ) override;
void SendFetchCompleteFrontier( void SendFetchCompleteFrontier(
@@ -47,7 +47,7 @@ class CRDTNotesToxSync : public CRDTNotesContactSyncModelI, public ToxEventI {
const CRDTNotes::DocID& doc_id const CRDTNotes::DocID& doc_id
) override; ) override;
void SendFetchOps( void SendFetchAddRange(
ContactHandle4 c, ContactHandle4 c,
const CRDTNotes::DocID& doc_id, const CRDTNotes::DocID& doc_id,
const CRDTNotes::CRDTAgent& agent, const CRDTNotes::CRDTAgent& agent,
@@ -55,6 +55,12 @@ class CRDTNotesToxSync : public CRDTNotesContactSyncModelI, public ToxEventI {
const uint64_t seq_to const uint64_t seq_to
) override; ) override;
void SendFetchDel(
ContactHandle4 c,
const CRDTNotes::DocID& doc_id,
const CRDTNotes::CRDTAgent& agent
) override;
void SendOps( void SendOps(
ContactHandle4 c, ContactHandle4 c,
const CRDTNotes::DocID& doc_id, const CRDTNotes::DocID& doc_id,
@@ -77,7 +83,12 @@ class CRDTNotesToxSync : public CRDTNotesContactSyncModelI, public ToxEventI {
const uint8_t* data, size_t data_size, const uint8_t* data, size_t data_size,
bool _private bool _private
); );
bool parse_crdtn_fetch_op_range( bool parse_crdtn_fetch_add_range(
ContactHandle4 c,
const uint8_t* data, size_t data_size,
bool _private
);
bool parse_crdtn_fetch_del(
ContactHandle4 c, ContactHandle4 c,
const uint8_t* data, size_t data_size, const uint8_t* data, size_t data_size,
bool _private bool _private