rizzler working, more fixes everywhere
there still are some crashes that needs workarounds
This commit is contained in:
parent
a139f412b1
commit
930c829031
@ -134,6 +134,8 @@ endif()
|
||||
########################################
|
||||
|
||||
add_library(solanaceae_ngchs2
|
||||
./solanaceae/ngc_hs2/serl.hpp
|
||||
|
||||
./solanaceae/ngc_hs2/ts_find_start.hpp
|
||||
|
||||
./solanaceae/ngc_hs2/ngc_hs2_sigma.hpp
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp> // this hurts
|
||||
#include <solanaceae/ngc_hs2/ngc_hs2_sigma.hpp>
|
||||
#include <solanaceae/ngc_hs2/ngc_hs2_rizzler.hpp>
|
||||
|
||||
@ -43,11 +44,12 @@ SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api)
|
||||
auto* rmm = PLUG_RESOLVE_INSTANCE(RegistryMessageModelI);
|
||||
auto* tcm = PLUG_RESOLVE_INSTANCE(ToxContactModel2);
|
||||
auto* ngcft1 = PLUG_RESOLVE_INSTANCE(NGCFT1);
|
||||
auto* sha1_ngcft1 = PLUG_RESOLVE_INSTANCE(SHA1_NGCFT1);
|
||||
|
||||
// static store, could be anywhere tho
|
||||
// construct with fetched dependencies
|
||||
g_ngchs2s = std::make_unique<NGCHS2Sigma>(*cr, *rmm, *tcm, *ngcft1);
|
||||
g_ngchs2r = std::make_unique<NGCHS2Rizzler>(*cr, *rmm, *tcm, *ngcft1, *tox_event_provider_i);
|
||||
g_ngchs2r = std::make_unique<NGCHS2Rizzler>(*cr, *rmm, *tcm, *ngcft1, *tox_event_provider_i, *sha1_ngcft1);
|
||||
|
||||
// register types
|
||||
PLUG_PROVIDE_INSTANCE(NGCHS2Sigma, plugin_name, g_ngchs2s.get());
|
||||
|
@ -28,7 +28,8 @@ void re_announce(
|
||||
// if not downloading or info incomplete -> remove
|
||||
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
|
||||
to_remove.push_back(ov);
|
||||
assert(false && "transfer in broken state");
|
||||
// TODO: triggers with hs, figure out why
|
||||
//assert(false && "transfer in broken state");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -552,6 +552,75 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
|
||||
updateMessages(o); // nop // TODO: remove
|
||||
}
|
||||
|
||||
void SHA1_NGCFT1::constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_kind file_kind, ByteSpan file_id) {
|
||||
if (file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) {
|
||||
return;
|
||||
}
|
||||
|
||||
// check if content exists
|
||||
const std::vector<uint8_t> sha1_info_hash{file_id.cbegin(), file_id.cend()};
|
||||
ObjectHandle o;
|
||||
if (_info_to_content.count(sha1_info_hash)) {
|
||||
o = _info_to_content.at(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has existing content\n";
|
||||
} else {
|
||||
// TODO: backend
|
||||
o = _mfb.newObject(ByteSpan{sha1_info_hash});
|
||||
_info_to_content[sha1_info_hash] = o;
|
||||
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
|
||||
std::cout << "SHA1_NGCFT1: new message has new content\n";
|
||||
}
|
||||
o.get_or_emplace<Components::Messages>().messages.push_back(msg);
|
||||
msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
|
||||
|
||||
// TODO: remove this assumption, this gets very unrelieable with hs
|
||||
if (const auto* from_c_comp = msg.try_get<Message::Components::ContactFrom>(); from_c_comp != nullptr && _cr.valid(from_c_comp->c)) {
|
||||
Contact3Handle c{_cr, from_c_comp->c};
|
||||
// HACK: assume the message sender is participating. usually a safe bet.
|
||||
if (addParticipation(c, o)) {
|
||||
// something happend, update chunk picker
|
||||
assert(static_cast<bool>(c));
|
||||
c.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
|
||||
// HACK: assume the message sender has all
|
||||
o.get_or_emplace<Components::RemoteHaveBitset>().others[c] = {true, {}};
|
||||
|
||||
// TODO: check if public
|
||||
// since public
|
||||
if (c.all_of<Contact::Components::Parent>()) {
|
||||
// TODO: if this is a dummy contact, should it have parent?
|
||||
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: queue info dl
|
||||
// TODO: queue info/check if we already have info
|
||||
if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
|
||||
bool in_info_want {false};
|
||||
for (const auto it : _queue_content_want_info) {
|
||||
if (it == o) {
|
||||
in_info_want = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!in_info_want) {
|
||||
// TODO: check if already receiving
|
||||
_queue_content_want_info.push_back(o);
|
||||
}
|
||||
} else if (o.all_of<Components::FT1InfoSHA1>()){
|
||||
// remove from info want
|
||||
o.remove<Components::ReRequestInfoTimer>();
|
||||
|
||||
auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o);
|
||||
if (it != _queue_content_want_info.cend()) {
|
||||
_queue_content_want_info.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
_os.throwEventUpdate(o);
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
|
||||
if (!e.e.all_of<ObjComp::Ephemeral::File::ActionTransferAccept>()) {
|
||||
return false;
|
||||
@ -868,7 +937,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
|
||||
std::cerr << "SHA1_NGCFT1 warning: unknown transfer " << (int)e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
|
||||
// not ours
|
||||
//std::cerr << "SHA1_NGCFT1 warning: unknown transfer " << (int)e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -920,7 +990,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) {
|
||||
std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
|
||||
// not ours
|
||||
//std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -1228,6 +1299,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
rb.try_emplace(self_c, ts);
|
||||
}
|
||||
|
||||
constructFileMessageInPlace({reg, new_msg_e}, e.file_kind, {e.file_id, e.file_id_size});
|
||||
|
||||
#if 0
|
||||
// check if content exists
|
||||
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
|
||||
ObjectHandle o;
|
||||
@ -1282,6 +1356,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
||||
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
|
||||
|
||||
_os.throwEventUpdate(o);
|
||||
#endif
|
||||
|
||||
_rmm.throwEventConstruct(reg, new_msg_e);
|
||||
|
||||
|
@ -107,6 +107,10 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
|
||||
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts);
|
||||
|
||||
// construct the file part in a partially constructed message
|
||||
// TODO: maybe return file object?
|
||||
void constructFileMessageInPlace(Message3Handle msg, NGCFT1_file_kind file_kind, ByteSpan file_id);
|
||||
|
||||
protected: // rmm events (actions)
|
||||
bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;
|
||||
|
||||
|
@ -1,28 +1,95 @@
|
||||
#include "./ngc_hs2_rizzler.hpp"
|
||||
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
#include <solanaceae/message3/registry_message_model.hpp>
|
||||
#include <solanaceae/contact/components.hpp>
|
||||
#include <solanaceae/tox_contacts/components.hpp>
|
||||
#include <solanaceae/tox_contacts/tox_contact_model2.hpp>
|
||||
#include <solanaceae/message3/contact_components.hpp>
|
||||
#include <solanaceae/message3/registry_message_model.hpp>
|
||||
#include <solanaceae/message3/components.hpp>
|
||||
#include <solanaceae/tox_messages/msg_components.hpp>
|
||||
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <entt/entity/entity.hpp>
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#include "./serl.hpp"
|
||||
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <cstring>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
// TODO: move to own file
|
||||
namespace Components {
|
||||
struct RequestedChatLogs {
|
||||
struct Entry {
|
||||
uint64_t ts_start;
|
||||
uint64_t ts_end;
|
||||
//std::vector<uint8_t> fid; // ?
|
||||
};
|
||||
std::deque<Entry> list;
|
||||
bool contains(uint64_t ts_start, uint64_t ts_end);
|
||||
void addRequest(uint64_t ts_start, uint64_t ts_end);
|
||||
};
|
||||
|
||||
struct RunningChatLogs {
|
||||
struct Entry {
|
||||
uint64_t ts_start;
|
||||
uint64_t ts_end;
|
||||
std::vector<uint8_t> data;
|
||||
float last_activity {0.f};
|
||||
};
|
||||
// list of transfers
|
||||
entt::dense_map<uint8_t, Entry> list;
|
||||
};
|
||||
|
||||
bool RequestedChatLogs::contains(uint64_t ts_start, uint64_t ts_end) {
|
||||
auto it = std::find_if(list.cbegin(), list.cend(), [ts_start, ts_end](const auto& value) {
|
||||
return value.ts_start == ts_start && value.ts_end == ts_end;
|
||||
});
|
||||
return it != list.cend();
|
||||
}
|
||||
|
||||
void RequestedChatLogs::addRequest(uint64_t ts_start, uint64_t ts_end) {
|
||||
if (contains(ts_start, ts_end)) {
|
||||
return; // pre existing
|
||||
}
|
||||
list.push_back(Entry{ts_start, ts_end});
|
||||
}
|
||||
|
||||
} // Components
|
||||
|
||||
// TODO: move to contact reg?
|
||||
static Contact3 findContactByID(Contact3Registry& cr, const std::vector<uint8_t>& id) {
|
||||
// TODO: id lookup table, this is very inefficent
|
||||
for (const auto& [c_it, id_it] : cr.view<Contact::Components::ID>().each()) {
|
||||
if (id == id_it.data) {
|
||||
return c_it;
|
||||
}
|
||||
}
|
||||
|
||||
return entt::null;
|
||||
}
|
||||
|
||||
NGCHS2Rizzler::NGCHS2Rizzler(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
NGCFT1& nft,
|
||||
ToxEventProviderI& tep
|
||||
ToxEventProviderI& tep,
|
||||
SHA1_NGCFT1& sha1_nft
|
||||
) :
|
||||
_cr(cr),
|
||||
_rmm(rmm),
|
||||
_tcm(tcm),
|
||||
_nft(nft),
|
||||
_nftep_sr(_nft.newSubRef(this)),
|
||||
_tep_sr(tep.newSubRef(this))
|
||||
|
||||
_tep_sr(tep.newSubRef(this)),
|
||||
_sha1_nft(sha1_nft)
|
||||
{
|
||||
_nftep_sr
|
||||
.subscribe(NGCFT1_Event::recv_init)
|
||||
@ -46,18 +113,23 @@ float NGCHS2Rizzler::iterate(float delta) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(it->first)) {
|
||||
const Contact3Handle c {_cr, it->first};
|
||||
|
||||
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
|
||||
// peer nolonger online
|
||||
it = _request_queue.erase(it);
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto [group_number, peer_number] = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(it->first);
|
||||
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
|
||||
|
||||
// now in sec
|
||||
const uint64_t ts_now = Message::getTimeMS()/1000;
|
||||
|
||||
if (sendRequest(group_number, peer_number, ts_now, ts_now-(60*60*48))) {
|
||||
const uint64_t ts_start = ts_now;
|
||||
const uint64_t ts_end = ts_now-(60*60*48);
|
||||
|
||||
if (sendRequest(group_number, peer_number, ts_start, ts_end)) {
|
||||
// TODO: requeue
|
||||
// TODO: segment
|
||||
// TODO: dont request already received ranges
|
||||
@ -73,6 +145,8 @@ float NGCHS2Rizzler::iterate(float delta) {
|
||||
|
||||
//std::cout << "ZOX #### requeued request in " << it->second.delay << "s\n";
|
||||
|
||||
auto& rcl = c.get_or_emplace<Components::RequestedChatLogs>();
|
||||
rcl.addRequest(ts_start, ts_end);
|
||||
} else {
|
||||
// on failure, assume disconnected
|
||||
}
|
||||
@ -84,32 +158,6 @@ float NGCHS2Rizzler::iterate(float delta) {
|
||||
return 1000.f;
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
static uint64_t deserlSimpleType(ByteSpan bytes) {
|
||||
if (bytes.size < sizeof(Type)) {
|
||||
throw int(1);
|
||||
}
|
||||
|
||||
Type value{};
|
||||
|
||||
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||
value |= Type(bytes[i]) << (i*8);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
static uint64_t deserlTS(ByteSpan ts_bytes) {
|
||||
return deserlSimpleType<uint64_t>(ts_bytes);
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
static void serlSimpleType(std::vector<uint8_t>& bytes, const Type& value) {
|
||||
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||
bytes.push_back(uint8_t(value >> (i*8) & 0xff));
|
||||
}
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::sendRequest(
|
||||
uint32_t group_number, uint32_t peer_number,
|
||||
uint64_t ts_start, uint64_t ts_end
|
||||
@ -132,17 +180,324 @@ bool NGCHS2Rizzler::sendRequest(
|
||||
);
|
||||
}
|
||||
|
||||
void NGCHS2Rizzler::handleMsgPack(Contact3Handle sync_by_c, const std::vector<uint8_t>& data) {
|
||||
assert(sync_by_c);
|
||||
|
||||
auto* reg_ptr = _rmm.get(sync_by_c);
|
||||
if (reg_ptr == nullptr) {
|
||||
std::cerr << "NGCHS2Rizzler error: group without msg reg\n";
|
||||
return;
|
||||
}
|
||||
|
||||
Message3Registry& reg = *reg_ptr;
|
||||
|
||||
uint64_t now_ts = Message::getTimeMS();
|
||||
|
||||
std::cout << "NGCHS2Rizzler: start parsing msgpack chatlog from " << entt::to_integral(sync_by_c.entity()) << "\n";
|
||||
try {
|
||||
const auto j = nlohmann::json::from_msgpack(data);
|
||||
if (!j.is_array()) {
|
||||
std::cerr << "NGCHS2Rizzler error: chatlog not array\n";
|
||||
return;
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Rizzler: chatlog has " << j.size() << " entries\n";
|
||||
|
||||
for (const auto j_entry : j) {
|
||||
try {
|
||||
// deci seconds
|
||||
uint64_t ts = j_entry.at("ts");
|
||||
// TODO: check against ts range
|
||||
|
||||
ts *= 100; // convert to ms
|
||||
|
||||
const auto& j_ppk = j_entry.at("ppk");
|
||||
|
||||
uint32_t mid = j_entry.at("mid");
|
||||
|
||||
const std::string& type = j_entry.at("msgtype");
|
||||
|
||||
if (
|
||||
!(j_entry.count("text")) &&
|
||||
!(j_entry.count("fkind") && j_entry.count("fid"))
|
||||
) {
|
||||
std::cerr << "NGCHS2Rizzler error: msg neither contains text nor file fields\n";
|
||||
continue;
|
||||
}
|
||||
|
||||
if (
|
||||
type != "text" && type != "action" &&
|
||||
type != "file"
|
||||
) {
|
||||
std::cerr << "NGCHS2Rizzler error: unknown entry '" << j_entry.dump() << "'\n";
|
||||
continue;
|
||||
}
|
||||
|
||||
Contact3 from_c{entt::null};
|
||||
{ // from_c
|
||||
std::vector<uint8_t> id;
|
||||
if (j_ppk.is_binary()) {
|
||||
id = j_ppk.get_binary();
|
||||
} else {
|
||||
j_ppk.at("bytes").get_to(id);
|
||||
}
|
||||
|
||||
from_c = findContactByID(_cr, id);
|
||||
if (!_cr.valid(from_c)) {
|
||||
// create sparse contact with id only
|
||||
from_c = _cr.create();
|
||||
_cr.emplace_or_replace<Contact::Components::ID>(from_c, id);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: from_c perm check
|
||||
// hard to do without numbers
|
||||
|
||||
Message3Handle new_real_msg{reg, reg.create()};
|
||||
|
||||
new_real_msg.emplace<Message::Components::ContactFrom>(from_c);
|
||||
new_real_msg.emplace<Message::Components::ContactTo>(sync_by_c.get<Contact::Components::Parent>().parent);
|
||||
|
||||
new_real_msg.emplace<Message::Components::ToxGroupMessageID>(mid);
|
||||
|
||||
if (type == "text" || type == "action") {
|
||||
bool is_action = type == "action";
|
||||
const std::string& text = j_entry.at("text");
|
||||
|
||||
new_real_msg.emplace<Message::Components::MessageText>(text);
|
||||
|
||||
if (is_action) {
|
||||
new_real_msg.emplace<Message::Components::TagMessageIsAction>();
|
||||
}
|
||||
#if 0
|
||||
std::cout
|
||||
<< "msg ts:" << ts
|
||||
//<< " ppk:" << j_ppk
|
||||
<< " mid:" << mid
|
||||
<< " type:" << type
|
||||
<< " text:" << text
|
||||
<< "\n"
|
||||
;
|
||||
#endif
|
||||
} else if (type == "file") {
|
||||
uint32_t fkind = j_entry.at("fkind");
|
||||
|
||||
const auto& j_fid = j_entry.at("fid");
|
||||
|
||||
std::vector<uint8_t> fid;
|
||||
if (j_fid.is_binary()) {
|
||||
fid = j_fid.get_binary();
|
||||
} else {
|
||||
j_fid.at("bytes").get_to(fid);
|
||||
}
|
||||
|
||||
if (fkind == (uint32_t)NGCFT1_file_kind::HASH_SHA1_INFO) {
|
||||
_sha1_nft.constructFileMessageInPlace(
|
||||
new_real_msg,
|
||||
NGCFT1_file_kind::HASH_SHA1_INFO,
|
||||
ByteSpan{fid}
|
||||
);
|
||||
} else {
|
||||
std::cerr << "NGCHS2Rizzler error: unknown file kind " << fkind << "\n";
|
||||
}
|
||||
|
||||
#if 0
|
||||
std::cout
|
||||
<< "msg ts:" << ts
|
||||
//<< " ppk:" << j_ppk
|
||||
<< " mid:" << mid
|
||||
<< " type:" << type
|
||||
<< " fkind:" << fkind
|
||||
<< " fid:" << j_fid
|
||||
<< "\n"
|
||||
;
|
||||
#endif
|
||||
}
|
||||
|
||||
// now check against pre existing
|
||||
// TODO: dont do this afterwards
|
||||
Message3Handle dup_msg{};
|
||||
{ // check preexisting
|
||||
// get comparator from contact
|
||||
const Contact3Handle reg_c {_cr, reg.ctx().get<Contact3>()};
|
||||
if (reg_c.all_of<Contact::Components::MessageIsSame>()) {
|
||||
auto& comp = reg_c.get<Contact::Components::MessageIsSame>().comp;
|
||||
// walking EVERY existing message OOF
|
||||
// this needs optimizing
|
||||
for (const Message3 other_msg : reg.view<Message::Components::Timestamp, Message::Components::ContactFrom, Message::Components::ContactTo>()) {
|
||||
if (other_msg == new_real_msg) {
|
||||
continue; // skip self
|
||||
}
|
||||
|
||||
if (comp({reg, other_msg}, new_real_msg)) {
|
||||
// dup
|
||||
dup_msg = {reg, other_msg};
|
||||
break;
|
||||
}
|
||||
}
|
||||
} // else, default heuristic??
|
||||
}
|
||||
|
||||
Message3Handle new_msg = new_real_msg;
|
||||
|
||||
if (dup_msg) {
|
||||
reg.destroy(new_msg);
|
||||
new_msg = dup_msg;
|
||||
}
|
||||
|
||||
{ // by whom
|
||||
auto& synced_by = new_msg.get_or_emplace<Message::Components::SyncedBy>().ts;
|
||||
// dont overwrite
|
||||
synced_by.try_emplace(sync_by_c, now_ts);
|
||||
}
|
||||
|
||||
{ // now we also know they got the message
|
||||
auto& list = new_msg.get_or_emplace<Message::Components::ReceivedBy>().ts;
|
||||
// dont overwrite
|
||||
list.try_emplace(sync_by_c, now_ts);
|
||||
}
|
||||
|
||||
if (new_msg == dup_msg) {
|
||||
// TODO: maybe update a timestamp?
|
||||
_rmm.throwEventUpdate(reg, new_msg);
|
||||
} else {
|
||||
// pure new msg
|
||||
|
||||
new_msg.emplace<Message::Components::TimestampProcessed>(now_ts);
|
||||
new_msg.emplace<Message::Components::TimestampWritten>(ts);
|
||||
new_msg.emplace<Message::Components::Timestamp>(ts); // reactive?
|
||||
|
||||
new_msg.emplace<Message::Components::TagUnread>();
|
||||
_rmm.throwEventConstruct(reg, new_msg);
|
||||
}
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2Rizzler error: parsing entry '" << j_entry.dump() << "'\n";
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2Rizzler error: failed parsing data as msgpack\n";
|
||||
}
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_init& e) {
|
||||
if (e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) {
|
||||
return false; // not for us
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Rizzler: recv_init " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
|
||||
return false;
|
||||
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (!c) {
|
||||
return false; // huh?
|
||||
}
|
||||
|
||||
if (!c.all_of<Components::RequestedChatLogs>()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// parse start end
|
||||
// TODO: extract
|
||||
ByteSpan fid{e.file_id, e.file_id_size};
|
||||
// TODO: better size check
|
||||
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
|
||||
std::cerr << "NGCHS2S error: range not lange enough\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
// seconds
|
||||
uint64_t ts_start{0};
|
||||
uint64_t ts_end{0};
|
||||
|
||||
// parse
|
||||
try {
|
||||
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
|
||||
ts_start = deserlTS(ts_start_bytes);
|
||||
|
||||
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
|
||||
ts_end = deserlTS(ts_end_bytes);
|
||||
} catch (...) {
|
||||
std::cerr << "NGCHS2R error: failed to parse range\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
if (ts_end >= ts_start) {
|
||||
std::cerr << "NGCHS2R error: end not < start\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
auto& reqcl = c.get<Components::RequestedChatLogs>();
|
||||
|
||||
if (!reqcl.contains(ts_start, ts_end)) {
|
||||
// warn?
|
||||
return true;
|
||||
}
|
||||
|
||||
auto& rnncl = c.get_or_emplace<Components::RunningChatLogs>();
|
||||
|
||||
auto& transfer = rnncl.list[e.transfer_id];
|
||||
transfer.data.reserve(e.file_size); // danger?
|
||||
transfer.last_activity = 0.f;
|
||||
transfer.ts_start = ts_start;
|
||||
transfer.ts_end = ts_end;
|
||||
|
||||
e.accept = true;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_data&) {
|
||||
return false;
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (!c) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!c.all_of<Components::RunningChatLogs>()) {
|
||||
return false; // not ours
|
||||
}
|
||||
|
||||
auto& rnncl = c.get<Components::RunningChatLogs>();
|
||||
if (!rnncl.list.count(e.transfer_id)) {
|
||||
return false; // not ours
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Rizzler: recv_data " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << " " << e.data_size << "@" << e.data_offset << "\n";
|
||||
|
||||
auto& transfer = rnncl.list.at(e.transfer_id);
|
||||
transfer.data.resize(e.data_offset+e.data_size);
|
||||
std::memcpy(&transfer.data[e.data_offset], e.data, e.data_size);
|
||||
|
||||
transfer.last_activity = 0.f;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_done&) {
|
||||
return false;
|
||||
bool NGCHS2Rizzler::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
// TODO: fix disconnect
|
||||
if (!c) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!c.all_of<Components::RunningChatLogs>()) {
|
||||
return false; // not ours
|
||||
}
|
||||
|
||||
auto& rnncl = c.get<Components::RunningChatLogs>();
|
||||
if (!rnncl.list.count(e.transfer_id)) {
|
||||
return false; // not ours
|
||||
}
|
||||
|
||||
std::cout << "NGCHS2Rizzler: recv_done " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
|
||||
{
|
||||
auto& transfer = rnncl.list.at(e.transfer_id);
|
||||
|
||||
// use data
|
||||
// TODO: move out of packet handler
|
||||
handleMsgPack(c, transfer.data);
|
||||
}
|
||||
rnncl.list.erase(e.transfer_id);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool NGCHS2Rizzler::onToxEvent(const Tox_Event_Group_Peer_Join* e) {
|
||||
|
@ -4,11 +4,13 @@
|
||||
#include <solanaceae/toxcore/tox_event_interface.hpp>
|
||||
|
||||
#include <solanaceae/ngc_ft1/ngcft1.hpp>
|
||||
#include <solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp>
|
||||
|
||||
// fwd
|
||||
class ToxContactModel2;
|
||||
class RegistryMessageModelI;
|
||||
|
||||
|
||||
class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
|
||||
Contact3Registry& _cr;
|
||||
RegistryMessageModelI& _rmm;
|
||||
@ -16,6 +18,7 @@ class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
|
||||
NGCFT1& _nft;
|
||||
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
||||
ToxEventProviderI::SubscriptionReference _tep_sr;
|
||||
SHA1_NGCFT1& _sha1_nft;
|
||||
|
||||
// 5s-6s
|
||||
const float _delay_before_first_request_min {5.f};
|
||||
@ -39,7 +42,8 @@ class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
|
||||
RegistryMessageModelI& rmm,
|
||||
ToxContactModel2& tcm,
|
||||
NGCFT1& nft,
|
||||
ToxEventProviderI& tep
|
||||
ToxEventProviderI& tep,
|
||||
SHA1_NGCFT1& sha1_nft
|
||||
);
|
||||
|
||||
~NGCHS2Rizzler(void);
|
||||
@ -52,6 +56,8 @@ class NGCHS2Rizzler : public ToxEventI, public NGCFT1EventI {
|
||||
uint64_t ts_start, uint64_t ts_end
|
||||
);
|
||||
|
||||
void handleMsgPack(Contact3Handle c, const std::vector<uint8_t>& data);
|
||||
|
||||
protected:
|
||||
bool onEvent(const Events::NGCFT1_recv_init&) override;
|
||||
bool onEvent(const Events::NGCFT1_recv_data&) override;
|
||||
|
@ -17,28 +17,52 @@
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#include "./serl.hpp"
|
||||
|
||||
#include "./ts_find_start.hpp"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
// https://www.youtube.com/watch?v=AdAqsgga3qo
|
||||
|
||||
// TODO: move to own file
|
||||
namespace Components {
|
||||
|
||||
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) {
|
||||
// TODO: do more than exact dedupe
|
||||
for (const auto& [time_range, _] : _queue) {
|
||||
if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) {
|
||||
return; // already enqueued
|
||||
// TODO: what about fid?
|
||||
}
|
||||
}
|
||||
struct IncommingTimeRangeRequestQueue {
|
||||
struct Entry {
|
||||
TimeRangeRequest ir;
|
||||
std::vector<uint8_t> fid;
|
||||
};
|
||||
std::deque<Entry> _queue;
|
||||
|
||||
_queue.emplace_back(Entry{
|
||||
new_request,
|
||||
std::vector<uint8_t>{fid.cbegin(), fid.cend()}
|
||||
});
|
||||
}
|
||||
// we should remove/notadd queued requests
|
||||
// that are subsets of same or larger ranges
|
||||
void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid);
|
||||
};
|
||||
|
||||
struct IncommingTimeRangeRequestRunning {
|
||||
struct Entry {
|
||||
TimeRangeRequest ir;
|
||||
std::vector<uint8_t> data; // transfer data in memory
|
||||
float last_activity {0.f};
|
||||
};
|
||||
entt::dense_map<uint8_t, Entry> _list;
|
||||
};
|
||||
|
||||
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) {
|
||||
// TODO: do more than exact dedupe
|
||||
for (const auto& [time_range, _] : _queue) {
|
||||
if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) {
|
||||
return; // already enqueued
|
||||
// TODO: what about fid?
|
||||
}
|
||||
}
|
||||
|
||||
_queue.emplace_back(Entry{
|
||||
new_request,
|
||||
std::vector<uint8_t>{fid.cbegin(), fid.cend()}
|
||||
});
|
||||
}
|
||||
|
||||
} // Components
|
||||
|
||||
@ -167,29 +191,6 @@ float NGCHS2Sigma::iterate(float delta) {
|
||||
return 1000.f;
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
static uint64_t deserlSimpleType(ByteSpan bytes) {
|
||||
if (bytes.size < sizeof(Type)) {
|
||||
throw int(1);
|
||||
}
|
||||
|
||||
Type value{};
|
||||
|
||||
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||
value |= Type(bytes[i]) << (i*8);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
//static uint32_t deserlMID(ByteSpan mid_bytes) {
|
||||
// return deserlSimpleType<uint32_t>(mid_bytes);
|
||||
//}
|
||||
|
||||
static uint64_t deserlTS(ByteSpan ts_bytes) {
|
||||
return deserlSimpleType<uint64_t>(ts_bytes);
|
||||
}
|
||||
|
||||
void NGCHS2Sigma::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
|
||||
ByteSpan fid{e.file_id, e.file_id_size};
|
||||
// TODO: better size check
|
||||
@ -333,6 +334,7 @@ std::vector<uint8_t> NGCHS2Sigma::buildChatLogFileRange(Contact3Handle c, uint64
|
||||
|
||||
// HACK: use tox fild_id and file_kind instead!!
|
||||
if (o.all_of<Components::FT1InfoSHA1Hash>()) {
|
||||
j_entry["msgtype"] = "file";
|
||||
j_entry["fkind"] = NGCFT1_file_kind::HASH_SHA1_INFO;
|
||||
j_entry["fid"] = nlohmann::json::binary_t{o.get<Components::FT1InfoSHA1Hash>().hash};
|
||||
} else {
|
||||
|
@ -23,30 +23,6 @@ struct TimeRangeRequest {
|
||||
uint64_t ts_end{0};
|
||||
};
|
||||
|
||||
// TODO: move to own file
|
||||
namespace Components {
|
||||
struct IncommingTimeRangeRequestQueue {
|
||||
struct Entry {
|
||||
TimeRangeRequest ir;
|
||||
std::vector<uint8_t> fid;
|
||||
};
|
||||
std::deque<Entry> _queue;
|
||||
|
||||
// we should remove/notadd queued requests
|
||||
// that are subsets of same or larger ranges
|
||||
void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid);
|
||||
};
|
||||
|
||||
struct IncommingTimeRangeRequestRunning {
|
||||
struct Entry {
|
||||
TimeRangeRequest ir;
|
||||
std::vector<uint8_t> data; // transfer data in memory
|
||||
float last_activity {0.f};
|
||||
};
|
||||
entt::dense_map<uint8_t, Entry> _list;
|
||||
};
|
||||
} // Components
|
||||
|
||||
class NGCHS2Sigma : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||
Contact3Registry& _cr;
|
||||
RegistryMessageModelI& _rmm;
|
||||
|
32
solanaceae/ngc_hs2/serl.hpp
Normal file
32
solanaceae/ngc_hs2/serl.hpp
Normal file
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
template<typename Type>
|
||||
static uint64_t deserlSimpleType(ByteSpan bytes) {
|
||||
if (bytes.size < sizeof(Type)) {
|
||||
throw int(1);
|
||||
}
|
||||
|
||||
Type value{};
|
||||
|
||||
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||
value |= Type(bytes[i]) << (i*8);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
static uint64_t deserlTS(ByteSpan ts_bytes) {
|
||||
return deserlSimpleType<uint64_t>(ts_bytes);
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
static void serlSimpleType(std::vector<uint8_t>& bytes, const Type& value) {
|
||||
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||
bytes.push_back(uint8_t(value >> (i*8) & 0xff));
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user