#include "./ngc_hs2_send.hpp" #include #include #include #include #include #include //#include // TODO: this is kinda bad, needs improvement // use tox fileid/filekind instead ! #include #include #include #include "./ts_find_start.hpp" #include // https://www.youtube.com/watch?v=AdAqsgga3qo namespace Components { void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request) { // TODO: do more than exact dedupe for (const auto& [ts_start, ts_end] : _queue) { if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) { return; // already enqueued } } _queue.push_back(new_request); } } // Components NGCHS2Send::NGCHS2Send( Contact3Registry& cr, RegistryMessageModelI& rmm, ToxContactModel2& tcm, NGCFT1& nft ) : _cr(cr), _rmm(rmm), _tcm(tcm), _nft(nft), _nftep_sr(_nft.newSubRef(this)) { _nftep_sr .subscribe(NGCFT1_Event::recv_request) .subscribe(NGCFT1_Event::send_data) .subscribe(NGCFT1_Event::send_done) ; } NGCHS2Send::~NGCHS2Send(void) { } float NGCHS2Send::iterate(float delta) { // limit how often we update here (new fts usually) if (_iterate_heat > 0.f) { _iterate_heat -= delta; return 1000.f; } else { _iterate_heat = _iterate_cooldown; } // work request queue // check if already running, discard auto fn_iirq = [this](auto&& view) { for (auto&& [cv, iirq] : view.each()) { Contact3Handle c{_cr, cv}; auto& iirr = c.get_or_emplace(); // dedup queued from running if (iirr._list.size() >= _max_parallel_per_peer) { continue; } // new ft here? } }; // first handle range requests on weak self //for (auto&& [cv, iirq] : _cr.view().each()) { fn_iirq(_cr.view()); // we could stop here, if too much is already running // then range on others //for (auto&& [cv, iirq] : _cr.view(entt::exclude_t{}).each()) { fn_iirq(_cr.view(entt::exclude_t{})); return 1000.f; } template static uint64_t deserlSimpleType(ByteSpan bytes) { if (bytes.size < sizeof(Type)) { throw int(1); } Type value; for (size_t i = 0; i < sizeof(Type); i++) { value |= Type(bytes[i]) << (i*8); } return value; } static uint32_t deserlMID(ByteSpan mid_bytes) { return deserlSimpleType(mid_bytes); } static uint64_t deserlTS(ByteSpan ts_bytes) { return deserlSimpleType(ts_bytes); } void NGCHS2Send::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) { ByteSpan fid{e.file_id, e.file_id_size}; // TODO: better size check if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) { std::cerr << "NGCHS2S error: range not lange enough\n"; return; } // seconds uint64_t ts_start{0}; uint64_t ts_end{0}; // parse try { ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)}; ts_start = deserlTS(ts_start_bytes); ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)}; ts_end = deserlTS(ts_end_bytes); } catch (...) { std::cerr << "NGCHS2S error: failed to parse range\n"; return; } // dedupe insert into queue // how much overlap do we allow? c.get_or_emplace().queueRequest({ ts_start, ts_end, }); } #if 0 void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) { ByteSpan fid{e.file_id, e.file_id_size}; // TODO: better size check if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) { std::cerr << "NGCHS2S error: singlemessage not lange enough\n"; return; } ByteSpan ppk; uint32_t mid {0}; uint64_t ts {0}; // deciseconds // parse try { // - ppk // TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32) ppk = {fid.ptr, 32}; // - mid ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)}; mid = deserlMID(mid_bytes); // - ts ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)}; ts = deserlTS(ts_bytes); } catch (...) { std::cerr << "NGCHS2S error: failed to parse singlemessage\n"; return; } // for queue, we need group, peer, msg_ppk, msg_mid, msg_ts // dedupe insert into queue c.get_or_emplace().queueRequest({ ppk, mid, ts, }); } #endif std::vector NGCHS2Send::buildHSFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) { const Message3Registry* reg_ptr = static_cast(_rmm).get(c); if (reg_ptr == nullptr) { return {}; } const Message3Registry& msg_reg = *reg_ptr; if (msg_reg.storage() == nullptr) { // nothing to do here return {}; } //std::cout << "!!!! starting msg ts search, ts_start:" << ts_start << " ts_end:" << ts_end << "\n"; auto ts_view = msg_reg.view(); // we iterate "forward", so from newest to oldest // start is the newest ts const auto ts_start_it = find_start_by_ts(ts_view, ts_start); // end is the oldest ts // we only search for the start point, because we walk to the end anyway auto j_array = nlohmann::json::array_t{}; // hmm // maybe use other view or something? for (auto it = ts_start_it; it != ts_view.end(); it++) { const auto e = *it; const auto& [ts_comp] = ts_view.get(e); if (ts_comp.ts > ts_start) { std::cerr << "!!!! msg ent in view too new\n"; continue; } else if (ts_comp.ts < ts_end) { // too old, we hit the end of the range break; } if (!msg_reg.all_of< Message::Components::ContactFrom, Message::Components::ContactTo, Message::Components::ToxGroupMessageID >(e)) { continue; // ?? } if (!msg_reg.any_of(e)) { continue; // skip } const auto& [c_from_c, c_to_c] = msg_reg.get(e); if (c_to_c.c != c) { // message was not public continue; } if (!_cr.valid(c_from_c.c)) { continue; // ??? } Contact3Handle c_from{_cr, c_from_c.c}; if (!c_from.all_of()) { continue; // ??? } if (_only_send_self_observed && msg_reg.all_of(e) && c.all_of()) { if (!msg_reg.get(e).ts.count(c.get().self)) { continue; // did not observe ourselfs, skip } } auto j_entry = nlohmann::json::object_t{}; j_entry["ts"] = ts_comp.ts/100; // millisec -> decisec { const auto& ppk_ref = c_from.get().peer_key.data; j_entry["ppk"] = nlohmann::json::binary_t{std::vector{ppk_ref.cbegin(), ppk_ref.cend()}}; } j_entry["mid"] = msg_reg.get(e).id; if (msg_reg.all_of(e)) { if (msg_reg.all_of(e)) { j_entry["msgtype"] = "action"; // TODO: textaction? } else { j_entry["msgtype"] = "text"; } j_entry["text"] = msg_reg.get(e).text; } else if (msg_reg.any_of(e)) { const auto& o = msg_reg.get(e).o; if (!o) { continue; } // HACK: use tox fild_id and file_kind instead!! if (o.all_of()) { j_entry["fkind"] = NGCFT1_file_kind::HASH_SHA1_INFO; j_entry["fid"] = nlohmann::json::binary_t{o.get().hash}; } else { continue; // unknown file type } } j_array.push_back(j_entry); } return nlohmann::json::to_msgpack(j_array); } bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) { return false; } bool NGCHS2Send::onEvent(const Message::Events::MessageUpdated&) { return false; } bool NGCHS2Send::onEvent(const Message::Events::MessageDestory&) { return false; } bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) { if ( e.file_kind != NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK ) { return false; // not for us } // TODO: when is it done from queue? auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); if (!c) { return false; // how } // is other peer allowed to make requests //bool quick_allow {false}; bool quick_allow {true}; // HACK: disable all restrictions for this early test // TODO: quick deny? { // - tagged as weakself if (!quick_allow && c.all_of()) { quick_allow = true; } // - sub perm level?? // - out of max time range (ft specific, not a quick_allow) } if (e.file_kind == NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK) { handleTimeRange(c, e); } return true; } bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data&) { return false; } bool NGCHS2Send::onEvent(const Events::NGCFT1_send_done&) { return false; }