#include "./ngc_hs.hpp" #include #include #include #include #include #include #include #include #include #include #include #include void ZoxNGCHistorySync::subscribeToEvents(void) { _zngcepi.subscribe(this, ZoxNGC_Event::ngch_request); _zngcepi.subscribe(this, ZoxNGC_Event::ngch_syncmsg); _tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_JOIN); } ZoxNGCHistorySync::ZoxNGCHistorySync(ToxEventProviderI& tep, ZoxNGCEventProviderI& zngcepi, ToxI& t, Contact3Registry& cr, ToxContactModel2& tcm, RegistryMessageModel& rmm) : _tep(tep), _zngcepi(zngcepi), _t(t), _cr(cr), _tcm(tcm), _rmm(rmm), _rng(std::random_device{}()) { subscribeToEvents(); } void ZoxNGCHistorySync::tick(float delta) { // send queued requests for (auto it = _request_queue.begin(); it != _request_queue.end();) { it->second.timer += delta; if (it->second.timer >= it->second.delay) { if (!_cr.all_of(it->first)) { // peer nolonger online it = _request_queue.erase(it); continue; } const auto [group_number, peer_number] = _cr.get(it->first); if (sendRequest(group_number, peer_number, it->second.sync_delta)) { // on success, requeue with longer delay (minutes) it->second.timer = 0.f; it->second.delay = _delay_next_request_min + _rng_dist(_rng)*_delay_next_request_add; // double the delay for overlap (9m-15m) // TODO: finetune it->second.sync_delta = uint8_t((it->second.delay/60.f)*2.f) + 1; std::cout << "ZOX #### requeued request in " << it->second.delay << "s\n"; it++; } else { // on failure, assume disconnected it = _request_queue.erase(it); } } else { it++; } } for (auto it = _sync_queue.begin(); it != _sync_queue.end();) { it->second.timer += delta; if (it->second.timer >= it->second.delay) { it->second.timer = 0.f; Message3 msg_e = it->second.ents.front(); it->second.ents.pop(); if (!_cr.all_of(it->first)) { // peer nolonger online it = _sync_queue.erase(it); continue; } const auto [group_number, peer_number] = _cr.get(it->first); auto* reg_ptr = _rmm.get(it->first); if (reg_ptr == nullptr) { //std::cout << "°°°°°°°° no reg for contact\n"; it = _sync_queue.erase(it); continue; } Message3Registry& reg = *reg_ptr; if (!reg.valid(msg_e)) { std::cerr << "ZOX NGCHS error: invalid message in sync send queue\n"; it = _sync_queue.erase(it); continue; } const auto& msg_sender = reg.get(msg_e).c; if (!_cr.all_of(msg_sender)) { std::cerr << "ZOX NGCHS error: msg sender without persistant\n"; continue; } //if (auto peer_persist_opt = _cm.toPersistent(msg_sender); peer_persist_opt.has_value() && std::holds_alternative(peer_persist_opt.value())) { // get name for peer // TODO: make sure there is no alias leaked //const auto msg_sender_name = _cm.getContactName(msg_sender); std::string_view msg_sender_name; if (_cr.all_of(msg_sender)) { msg_sender_name = _cr.get(msg_sender).name; } if (!sendSyncMessage( group_number, peer_number, reg.get(msg_e).id, _cr.get(msg_sender).peer_key.data, std::chrono::duration_cast(std::chrono::milliseconds{reg.get(msg_e).ts}).count(), msg_sender_name, reg.get(msg_e).text ) || it->second.ents.empty()) { it = _sync_queue.erase(it); continue; } } it++; } } bool ZoxNGCHistorySync::sendRequest( uint32_t group_number, uint32_t peer_number, uint8_t sync_delta ) { std::vector packet; { // magic //0x667788113435 packet.push_back(0x66); packet.push_back(0x77); packet.push_back(0x88); packet.push_back(0x11); packet.push_back(0x34); packet.push_back(0x35); } packet.push_back(0x01); // version packet.push_back(0x01); // pkt_id packet.push_back(sync_delta); auto ret = _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, packet); // TODO: log error return ret == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; } bool ZoxNGCHistorySync::sendSyncMessage( uint32_t group_number, uint32_t peer_number, uint32_t message_id, const std::array& sender_pub_key, uint32_t timestamp, std::string_view sender_name, std::string_view message_text ) { std::vector packet; { // magic //0x667788113435 packet.push_back(0x66); packet.push_back(0x77); packet.push_back(0x88); packet.push_back(0x11); packet.push_back(0x34); packet.push_back(0x35); } packet.push_back(0x01); // version packet.push_back(0x02); // pkt_id // 4 bytes, message id packet.push_back(0xff & (message_id >> 8*3)); packet.push_back(0xff & (message_id >> 8*2)); packet.push_back(0xff & (message_id >> 8*1)); packet.push_back(0xff & (message_id >> 8*0)); // 32 bytes, sender pub key packet.insert(packet.end(), sender_pub_key.cbegin(), sender_pub_key.cend()); // 4 bytes, timestamp packet.push_back(0xff & (timestamp >> 8*3)); packet.push_back(0xff & (timestamp >> 8*2)); packet.push_back(0xff & (timestamp >> 8*1)); packet.push_back(0xff & (timestamp >> 8*0)); // 25 bytes, sender name, truncated/filled with 0 // TODO: handle unicode properly for (size_t i = 0; i < 25; i++) { if (i < sender_name.size()) { packet.push_back(sender_name.at(i)); } else { packet.push_back('\0'); } } // up to 39927 bytes, message #if 0 packet.insert(packet.end(), message_text.cbegin(), message_text.cend()); #else //const int64_t msg_max_possible_size = _t.toxGroup // TODO: make pr and add functions const uint64_t msg_max_possible_size = std::clamp( TOX_GROUP_MAX_CUSTOM_LOSSLESS_PACKET_LENGTH - packet.size(), 0, // low 39927 // high ); for (size_t i = 0; i < msg_max_possible_size && i < message_text.size(); i++) { packet.push_back(message_text.at(i)); } #endif auto ret = _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, packet); // TODO: log error return ret == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; } bool ZoxNGCHistorySync::onEvent(const Events::ZoxNGC_ngch_request& e) { std::cout << "ZOX ngch_request" << " grp:" << e.group_number << " per:" << e.peer_number << " prv:" << e._private << " sdl:" << (int)e.sync_delta << "\n"; // if blacklisted / on cool down const auto request_sender = _tcm.getContactGroupPeer(e.group_number, e.peer_number); if (_sync_queue.count(request_sender)) { std::cerr << "ZNGCHS waring: ngch_request but still in sync send queue\n"; return true; } // const -> dont create (this is a request for existing messages) auto* reg_ptr = static_cast(_rmm).get(request_sender); if (reg_ptr == nullptr) { std::cerr << "ZNGCHS error: group without reg\n"; return true; } const Message3Registry& reg = *reg_ptr; std::queue msg_send_queue; // convert sync delta to ms const int64_t sync_delta_offset_ms = int64_t(e.sync_delta) * 1000 * 60; const uint64_t ts_start = Message::getTimeMS() - sync_delta_offset_ms; auto view = reg.view(); view.use(); view.each([&](const Message3 e, const auto&, const auto& c_t, const auto& ts, const auto&, const auto&) { // private if (!_cr.all_of(c_t.c)) { return; } if (ts.ts < ts_start) { //std::cout << "---- " << ts.ts << " < " << ts_start << " -> too old\n"; return; } if (reg.all_of(e)) { const auto& list = reg.get(e).ts; // TODO: optimize with list.contains(self); if ( std::find_if( list.cbegin(), list.cend(), [this](const auto&& it) { // TODO: add weak self return _cr.all_of(it.first); } ) == list.cend() ) { // self not found // TODO: config for self only return; } } std::cout << "---- " << ts.ts << " >= " << ts_start << " -> selected\n"; msg_send_queue.push(e); }); std::cout << "ZOX ngch_request selected " << msg_send_queue.size() << " messages\n"; if (!msg_send_queue.empty()) { _sync_queue[request_sender] = SyncQueueInfo{ _delay_between_syncs_min + _rng_dist(_rng)*_delay_between_syncs_add, 0.f, std::move(msg_send_queue) }; } return true; } bool ZoxNGCHistorySync::onEvent(const Events::ZoxNGC_ngch_syncmsg& e) { std::cout << "ZOX ngch_syncmsg" // who sent the syncmsg << " grp:" << e.group_number << " per:" << e.peer_number << " prv:" << e._private // its contents << " mid:" << e.message_id << " spk:" << std::hex << (uint16_t)e.sender_pub_key[0] << (uint16_t)e.sender_pub_key[1] << std::dec << " ts:" << e.timestamp << " snm:" << e.sender_name << " txt:" << e.message_text << "\n"; auto sync_by_c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); assert(static_cast(sync_by_c)); auto* reg_ptr = _rmm.get(sync_by_c); if (reg_ptr == nullptr) { std::cerr << "ZNGCHS error: group without reg\n"; return false; } Message3Registry& reg = *reg_ptr; const auto sync_c = _tcm.getContactGroupPeer(e.group_number, ToxKey{e.sender_pub_key.data(), e.sender_pub_key.size()}); assert(static_cast(sync_c)); // TODO: make conditional // convert to ms uint64_t sync_ts = std::chrono::milliseconds(std::chrono::seconds{e.timestamp}).count(); // o.o uint64_t now_ts = Message::getTimeMS(); const uint64_t max_future_ms = 1u*60u*1000u; // accept up to 1 minute into the future if (sync_ts - max_future_ms > now_ts) { // message is too far into the future std::cerr << "ZNGCHS error: message ts was too far into the future\n"; return true; // false? keep handled? } // find matches Message3 matching_e = entt::null; { auto view = reg.view(); view.use(); for (const auto ent : view) { if (view.get(ent).id != e.message_id) { continue; } // how far apart the 2 timestamps can be, before they are considered different messages if (std::abs(int64_t(view.get(ent).ts) - int64_t(sync_ts)) > _max_age_difference_ms) { std::cout << "ZOX NGCHS info: same message id, but different timestamp\n"; continue; } const auto& ent_c = view.get(ent).c; if (!(ent_c == sync_c)) { std::cout << "ZOX NGCHS info: same message id, but different sender\n"; continue; } matching_e = ent; break; // TODO: matching list } } if (reg.valid(matching_e)) { // TODO: do something else, like average?, trust mods more? const bool has_tw = reg.all_of(matching_e); auto& msg_ts_w = reg.get_or_emplace(matching_e, sync_ts); if (has_tw) { if (msg_ts_w.ts > sync_ts) { msg_ts_w.ts = sync_ts; reg.emplace_or_replace(matching_e, sync_ts); _rmm.throwEventUpdate(reg, matching_e); } } else { // TODO: actually, dont do anything? _rmm.throwEventUpdate(reg, matching_e); } } else { // tmp, assume message new matching_e = reg.create(); reg.emplace(matching_e, sync_c); reg.emplace(matching_e, sync_by_c.get().parent); reg.emplace(matching_e, e.message_id); reg.emplace(matching_e, e.message_text); reg.emplace(matching_e, now_ts); reg.emplace(matching_e, sync_ts); reg.emplace(matching_e, sync_ts); // reactive? reg.emplace(matching_e); _rmm.throwEventConstruct(reg, matching_e); } { // by whom auto& synced_by = reg.get_or_emplace(matching_e).ts; // dont overwrite synced_by.try_emplace(sync_by_c, now_ts); // TODO: throw update? } { // now we also know they got the message auto& list = reg.get_or_emplace(matching_e).ts; // dont overwrite list.try_emplace(sync_by_c, now_ts); // TODO: throw update? } return true; } bool ZoxNGCHistorySync::onToxEvent(const Tox_Event_Group_Peer_Join* e) { const auto group_number = tox_event_group_peer_join_get_group_number(e); const auto peer_number = tox_event_group_peer_join_get_peer_id(e); const auto c = _tcm.getContactGroupPeer(group_number, peer_number); if (!_request_queue.count(c)) { _request_queue[c] = { _delay_before_first_request_min + _rng_dist(_rng)*_delay_before_first_request_add, 0.f, 130u // TODO: magic number }; } return false; }