Compare commits

..

No commits in common. "34dc01d4dcdab3591c100456c301784b4aeac3c7" and "37b92f67c8cd7e7668a29311261fbb2ab25d9bc9" have entirely different histories.

3 changed files with 20 additions and 238 deletions

View File

@ -133,7 +133,6 @@ void ChunkPicker::updateParticipation(
entt::dense_set<Object> checked; entt::dense_set<Object> checked;
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) { for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
const ObjectHandle o {objreg, ov}; const ObjectHandle o {objreg, ov};
if (participating_unfinished.contains(o)) { if (participating_unfinished.contains(o)) {
@ -151,21 +150,6 @@ void ChunkPicker::updateParticipation(
participating_unfinished.erase(o); participating_unfinished.erase(o);
continue; continue;
} }
// TODO: optimize this to only change on dirty, or something
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
Priority prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
uint16_t pskips =
prio == Priority::HIGHEST ? 0u :
prio == Priority::HIGH ? 1u :
prio == Priority::NORMAL ? 2u :
prio == Priority::LOW ? 4u :
8u // LOWEST
;
participating_unfinished.at(o).should_skip = pskips;
}
} else { } else {
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) { if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
continue; continue;
@ -176,6 +160,8 @@ void ChunkPicker::updateParticipation(
} }
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) { if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
//using Priority = Components::DownloadPriority::Priority;
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
Priority prio = Priority::NORMAL; Priority prio = Priority::NORMAL;
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) { if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
@ -253,12 +239,13 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// round robin content (remember last obj) // round robin content (remember last obj)
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) { if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
participating_in_last = participating_unfinished.begin()->first; participating_in_last = participating_unfinished.begin()->first;
//participating_in_last = *participating_unfinished.begin();
} }
assert(objreg.valid(participating_in_last)); assert(objreg.valid(participating_in_last));
auto it = participating_unfinished.find(participating_in_last); auto it = participating_unfinished.find(participating_in_last);
// hard limit robin rounds to array size time 20 // hard limit robin rounds to array size time 20
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) { for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++) {
if (it == participating_unfinished.end()) { if (it == participating_unfinished.end()) {
it = participating_unfinished.begin(); it = participating_unfinished.begin();
} }
@ -267,7 +254,6 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
it->second.skips++; it->second.skips++;
continue; continue;
} }
it->second.skips = 0;
ObjectHandle o {objreg, it->first}; ObjectHandle o {objreg, it->first};
@ -375,8 +361,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
} }
} }
//if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) { if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
if (it == participating_unfinished.end()) {
participating_in_last = entt::null; participating_in_last = entt::null;
} else { } else {
participating_in_last = it->first; participating_in_last = it->first;

View File

@ -6,35 +6,8 @@
#include <solanaceae/contact/components.hpp> #include <solanaceae/contact/components.hpp>
#include <iostream>
// https://www.youtube.com/watch?v=AdAqsgga3qo // https://www.youtube.com/watch?v=AdAqsgga3qo
namespace Components {
void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) {
// TODO: do more than exact dedupe
for (const auto& [ts_start, ts_end] : _queue) {
if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) {
return; // already enqueued
}
}
_queue.push_back(new_request);
}
void IncommingMsgRequestQueue::queueRequest(const SingleMessageRequest& new_request) {
for (const auto& [ppk, mid, ts] : _queue) {
if (mid == new_request.mid && ts == new_request.ts && ppk == new_request.ppk) {
return; // already enqueued
}
}
_queue.push_back(new_request);
}
} // Components
NGCHS2Send::NGCHS2Send( NGCHS2Send::NGCHS2Send(
Contact3Registry& cr, Contact3Registry& cr,
RegistryMessageModelI& rmm, RegistryMessageModelI& rmm,
@ -61,152 +34,41 @@ NGCHS2Send::~NGCHS2Send(void) {
} }
float NGCHS2Send::iterate(float delta) { float NGCHS2Send::iterate(float delta) {
// limit how often we update here (new fts usually)
if (_iterate_heat > 0.f) {
_iterate_heat -= delta;
return 1000.f;
} else {
_iterate_heat = _iterate_cooldown;
}
// work request queue // work request queue
// check if already running, discard // check if already running, discard
auto fn_iirq = [this](auto&& view) {
for (auto&& [cv, iirq] : view.each()) {
Contact3Handle c{_cr, cv};
auto& iirr = c.get_or_emplace<Components::IncommingInfoRequestRunning>();
// dedup queued from running
if (iirr._list.size() >= _max_parallel_per_peer) {
continue;
}
// new ft here?
}
};
auto fn_imrq = [this](auto&& view) {
for (auto&& [cv, imrq] : view.each()) {
Contact3Handle c{_cr, cv};
auto& imrr = c.get_or_emplace<Components::IncommingMsgRequestRunning>();
// dedup queued from running
if (imrr._list.size() >= _max_parallel_per_peer) {
continue;
}
// new ft here?
}
};
// first handle range requests on weak self
//for (auto&& [cv, iirq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>().each()) {
fn_iirq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>());
// then handle messages on weak self
//for (auto&& [cv, imrq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>().each()) {
fn_imrq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>());
// we could stop here, if too much is already running
// then range on others
//for (auto&& [cv, iirq] : _cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
fn_iirq(_cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
// then messages on others
//for (auto&& [cv, imrq] : _cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
fn_imrq(_cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
return 1000.f; return 1000.f;
} }
template<typename Type>
static uint64_t deserlSimpleType(ByteSpan bytes) {
if (bytes.size < sizeof(Type)) {
throw int(1);
}
Type value;
for (size_t i = 0; i < sizeof(Type); i++) {
value |= Type(bytes[i]) << (i*8);
}
return value;
}
static uint32_t deserlMID(ByteSpan mid_bytes) {
return deserlSimpleType<uint32_t>(mid_bytes);
}
static uint64_t deserlTS(ByteSpan ts_bytes) {
return deserlSimpleType<uint64_t>(ts_bytes);
}
void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) { void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size}; ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
std::cerr << "NGCHS2S error: range not lange enough\n";
return;
}
// seconds
uint64_t ts_start{0};
uint64_t ts_end{0};
// parse // parse
try { // - ts start
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)}; // - ts end
ts_start = deserlTS(ts_start_bytes);
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
ts_end = deserlTS(ts_end_bytes);
} catch (...) {
std::cerr << "NGCHS2S error: failed to parse range\n";
return;
}
// dedupe insert into queue // dedupe insert into queue
// how much overlap do we allow?
c.get_or_emplace<Components::IncommingInfoRequestQueue>().queueRequest({
ts_start,
ts_end,
});
} }
void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) { void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size}; ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) { if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) {
std::cerr << "NGCHS2S error: singlemessage not lange enough\n"; // error
return; return;
} }
ByteSpan ppk;
uint32_t mid {0};
uint64_t ts {0}; // deciseconds
// parse // parse
try { // - ppk
// - ppk // TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32)
// TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32) ByteSpan ppk{fid.ptr, 32};
ppk = {fid.ptr, 32};
// - mid // - mid
ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)}; //static_assert(sizeof(Tox_Group_Message_Id) == sizeof(uint32_t));
mid = deserlMID(mid_bytes); ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)};
// - ts // - ts
ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)}; // uint64_t (seconds? we dont want milliseconds
ts = deserlTS(ts_bytes); ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)};
} catch (...) {
std::cerr << "NGCHS2S error: failed to parse singlemessage\n";
return;
}
// file content // file content
// - message type (text/textaction/file(ft1sha1)) // - message type (text/textaction/file(ft1sha1))
@ -219,11 +81,6 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv
// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts // for queue, we need group, peer, msg_ppk, msg_mid, msg_ts
// dedupe insert into queue // dedupe insert into queue
c.get_or_emplace<Components::IncommingMsgRequestQueue>().queueRequest({
ppk,
mid,
ts,
});
} }
bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) { bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) {

View File

@ -9,60 +9,10 @@
#include <entt/container/dense_map.hpp> #include <entt/container/dense_map.hpp>
#include <solanaceae/util/span.hpp>
#include <vector>
// fwd // fwd
class ToxContactModel2; class ToxContactModel2;
// limit to 2 uploads per peer simultaniously
struct InfoRequest {
uint64_t ts_start{0};
uint64_t ts_end{0};
};
struct SingleMessageRequest {
ByteSpan ppk;
uint32_t mid {0};
uint64_t ts {0}; // deciseconds
};
// TODO: move to own file
namespace Components {
struct IncommingInfoRequestQueue {
std::vector<InfoRequest> _queue;
// we should remove/notadd queued requests
// that are subsets of same or larger ranges
void queueRequest(const InfoRequest& new_request);
};
struct IncommingInfoRequestRunning {
struct Entry {
InfoRequest ir;
std::vector<uint8_t> data; // trasfer data in memory
};
entt::dense_map<uint8_t, Entry> _list;
};
struct IncommingMsgRequestQueue {
// optimize dup lookups (this list could be large)
std::vector<SingleMessageRequest> _queue;
// removes dups
void queueRequest(const SingleMessageRequest& new_request);
};
struct IncommingMsgRequestRunning {
struct Entry {
SingleMessageRequest smr;
std::vector<uint8_t> data; // trasfer data in memory
};
// make more efficent? this list is very short
entt::dense_map<uint8_t, Entry> _list;
};
} // Components
class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
Contact3Registry& _cr; Contact3Registry& _cr;
@ -71,22 +21,12 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
NGCFT1& _nft; NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr; NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
float _iterate_heat {0.f};
constexpr static float _iterate_cooldown {1.22f}; // sec
// open/running info requests (by c) // open/running info requests (by c)
// comp on peer c
// open/running info responses (by c) // open/running info responses (by c)
// comp on peer c
// limit to 2 uploads per peer simultaniously static const bool _only_send_self_observed {true};
// TODO: increase for prod (4?) static const int64_t _max_time_into_past_default {60}; // s
// currently per type
constexpr static size_t _max_parallel_per_peer {2};
constexpr static bool _only_send_self_observed {true};
constexpr static int64_t _max_time_into_past_default {60*15}; // s
public: public:
NGCHS2Send( NGCHS2Send(