Compare commits

..

2 Commits

Author SHA1 Message Date
Green Sky
34dc01d4dc
fix chunk picker round robin actually working
(worked kinda bc of bug before)
make priority dynamic and fix skipping to make it work too
2024-11-22 13:51:48 +01:00
Green Sky
1bf1fbce75
small hs progress 2024-11-22 13:51:25 +01:00
3 changed files with 238 additions and 20 deletions

View File

@ -133,6 +133,7 @@ void ChunkPicker::updateParticipation(
entt::dense_set<Object> checked; entt::dense_set<Object> checked;
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) { for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
const ObjectHandle o {objreg, ov}; const ObjectHandle o {objreg, ov};
if (participating_unfinished.contains(o)) { if (participating_unfinished.contains(o)) {
@ -150,6 +151,21 @@ void ChunkPicker::updateParticipation(
participating_unfinished.erase(o); participating_unfinished.erase(o);
continue; continue;
} }
// TODO: optimize this to only change on dirty, or something
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
Priority prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
uint16_t pskips =
prio == Priority::HIGHEST ? 0u :
prio == Priority::HIGH ? 1u :
prio == Priority::NORMAL ? 2u :
prio == Priority::LOW ? 4u :
8u // LOWEST
;
participating_unfinished.at(o).should_skip = pskips;
}
} else { } else {
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) { if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
continue; continue;
@ -160,8 +176,6 @@ void ChunkPicker::updateParticipation(
} }
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) { if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
//using Priority = Components::DownloadPriority::Priority;
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
Priority prio = Priority::NORMAL; Priority prio = Priority::NORMAL;
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) { if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
@ -239,13 +253,12 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// round robin content (remember last obj) // round robin content (remember last obj)
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) { if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
participating_in_last = participating_unfinished.begin()->first; participating_in_last = participating_unfinished.begin()->first;
//participating_in_last = *participating_unfinished.begin();
} }
assert(objreg.valid(participating_in_last)); assert(objreg.valid(participating_in_last));
auto it = participating_unfinished.find(participating_in_last); auto it = participating_unfinished.find(participating_in_last);
// hard limit robin rounds to array size time 20 // hard limit robin rounds to array size time 20
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++) { for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) {
if (it == participating_unfinished.end()) { if (it == participating_unfinished.end()) {
it = participating_unfinished.begin(); it = participating_unfinished.begin();
} }
@ -254,6 +267,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
it->second.skips++; it->second.skips++;
continue; continue;
} }
it->second.skips = 0;
ObjectHandle o {objreg, it->first}; ObjectHandle o {objreg, it->first};
@ -361,7 +375,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
} }
} }
if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) { //if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
if (it == participating_unfinished.end()) {
participating_in_last = entt::null; participating_in_last = entt::null;
} else { } else {
participating_in_last = it->first; participating_in_last = it->first;

View File

@ -6,8 +6,35 @@
#include <solanaceae/contact/components.hpp> #include <solanaceae/contact/components.hpp>
#include <iostream>
// https://www.youtube.com/watch?v=AdAqsgga3qo // https://www.youtube.com/watch?v=AdAqsgga3qo
namespace Components {
void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) {
// TODO: do more than exact dedupe
for (const auto& [ts_start, ts_end] : _queue) {
if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) {
return; // already enqueued
}
}
_queue.push_back(new_request);
}
void IncommingMsgRequestQueue::queueRequest(const SingleMessageRequest& new_request) {
for (const auto& [ppk, mid, ts] : _queue) {
if (mid == new_request.mid && ts == new_request.ts && ppk == new_request.ppk) {
return; // already enqueued
}
}
_queue.push_back(new_request);
}
} // Components
NGCHS2Send::NGCHS2Send( NGCHS2Send::NGCHS2Send(
Contact3Registry& cr, Contact3Registry& cr,
RegistryMessageModelI& rmm, RegistryMessageModelI& rmm,
@ -34,41 +61,152 @@ NGCHS2Send::~NGCHS2Send(void) {
} }
float NGCHS2Send::iterate(float delta) { float NGCHS2Send::iterate(float delta) {
// limit how often we update here (new fts usually)
if (_iterate_heat > 0.f) {
_iterate_heat -= delta;
return 1000.f;
} else {
_iterate_heat = _iterate_cooldown;
}
// work request queue // work request queue
// check if already running, discard // check if already running, discard
auto fn_iirq = [this](auto&& view) {
for (auto&& [cv, iirq] : view.each()) {
Contact3Handle c{_cr, cv};
auto& iirr = c.get_or_emplace<Components::IncommingInfoRequestRunning>();
// dedup queued from running
if (iirr._list.size() >= _max_parallel_per_peer) {
continue;
}
// new ft here?
}
};
auto fn_imrq = [this](auto&& view) {
for (auto&& [cv, imrq] : view.each()) {
Contact3Handle c{_cr, cv};
auto& imrr = c.get_or_emplace<Components::IncommingMsgRequestRunning>();
// dedup queued from running
if (imrr._list.size() >= _max_parallel_per_peer) {
continue;
}
// new ft here?
}
};
// first handle range requests on weak self
//for (auto&& [cv, iirq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>().each()) {
fn_iirq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>());
// then handle messages on weak self
//for (auto&& [cv, imrq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>().each()) {
fn_imrq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>());
// we could stop here, if too much is already running
// then range on others
//for (auto&& [cv, iirq] : _cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
fn_iirq(_cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
// then messages on others
//for (auto&& [cv, imrq] : _cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
fn_imrq(_cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
return 1000.f; return 1000.f;
} }
template<typename Type>
static uint64_t deserlSimpleType(ByteSpan bytes) {
if (bytes.size < sizeof(Type)) {
throw int(1);
}
Type value;
for (size_t i = 0; i < sizeof(Type); i++) {
value |= Type(bytes[i]) << (i*8);
}
return value;
}
static uint32_t deserlMID(ByteSpan mid_bytes) {
return deserlSimpleType<uint32_t>(mid_bytes);
}
static uint64_t deserlTS(ByteSpan ts_bytes) {
return deserlSimpleType<uint64_t>(ts_bytes);
}
void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) { void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size}; ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
std::cerr << "NGCHS2S error: range not lange enough\n";
return;
}
// seconds
uint64_t ts_start{0};
uint64_t ts_end{0};
// parse // parse
// - ts start try {
// - ts end ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
ts_start = deserlTS(ts_start_bytes);
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
ts_end = deserlTS(ts_end_bytes);
} catch (...) {
std::cerr << "NGCHS2S error: failed to parse range\n";
return;
}
// dedupe insert into queue // dedupe insert into queue
// how much overlap do we allow?
c.get_or_emplace<Components::IncommingInfoRequestQueue>().queueRequest({
ts_start,
ts_end,
});
} }
void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) { void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
ByteSpan fid{e.file_id, e.file_id_size}; ByteSpan fid{e.file_id, e.file_id_size};
// TODO: better size check
if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) { if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) {
// error std::cerr << "NGCHS2S error: singlemessage not lange enough\n";
return; return;
} }
ByteSpan ppk;
uint32_t mid {0};
uint64_t ts {0}; // deciseconds
// parse // parse
try {
// - ppk // - ppk
// TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32) // TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32)
ByteSpan ppk{fid.ptr, 32}; ppk = {fid.ptr, 32};
// - mid // - mid
//static_assert(sizeof(Tox_Group_Message_Id) == sizeof(uint32_t));
ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)}; ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)};
mid = deserlMID(mid_bytes);
// - ts // - ts
// uint64_t (seconds? we dont want milliseconds
ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)}; ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)};
ts = deserlTS(ts_bytes);
} catch (...) {
std::cerr << "NGCHS2S error: failed to parse singlemessage\n";
return;
}
// file content // file content
// - message type (text/textaction/file(ft1sha1)) // - message type (text/textaction/file(ft1sha1))
@ -81,6 +219,11 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv
// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts // for queue, we need group, peer, msg_ppk, msg_mid, msg_ts
// dedupe insert into queue // dedupe insert into queue
c.get_or_emplace<Components::IncommingMsgRequestQueue>().queueRequest({
ppk,
mid,
ts,
});
} }
bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) { bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) {

View File

@ -9,10 +9,60 @@
#include <entt/container/dense_map.hpp> #include <entt/container/dense_map.hpp>
#include <solanaceae/util/span.hpp>
#include <vector>
// fwd // fwd
class ToxContactModel2; class ToxContactModel2;
// limit to 2 uploads per peer simultaniously
struct InfoRequest {
uint64_t ts_start{0};
uint64_t ts_end{0};
};
struct SingleMessageRequest {
ByteSpan ppk;
uint32_t mid {0};
uint64_t ts {0}; // deciseconds
};
// TODO: move to own file
namespace Components {
struct IncommingInfoRequestQueue {
std::vector<InfoRequest> _queue;
// we should remove/notadd queued requests
// that are subsets of same or larger ranges
void queueRequest(const InfoRequest& new_request);
};
struct IncommingInfoRequestRunning {
struct Entry {
InfoRequest ir;
std::vector<uint8_t> data; // trasfer data in memory
};
entt::dense_map<uint8_t, Entry> _list;
};
struct IncommingMsgRequestQueue {
// optimize dup lookups (this list could be large)
std::vector<SingleMessageRequest> _queue;
// removes dups
void queueRequest(const SingleMessageRequest& new_request);
};
struct IncommingMsgRequestRunning {
struct Entry {
SingleMessageRequest smr;
std::vector<uint8_t> data; // trasfer data in memory
};
// make more efficent? this list is very short
entt::dense_map<uint8_t, Entry> _list;
};
} // Components
class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
Contact3Registry& _cr; Contact3Registry& _cr;
@ -21,12 +71,22 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
NGCFT1& _nft; NGCFT1& _nft;
NGCFT1EventProviderI::SubscriptionReference _nftep_sr; NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
float _iterate_heat {0.f};
constexpr static float _iterate_cooldown {1.22f}; // sec
// open/running info requests (by c) // open/running info requests (by c)
// comp on peer c
// open/running info responses (by c) // open/running info responses (by c)
// comp on peer c
static const bool _only_send_self_observed {true}; // limit to 2 uploads per peer simultaniously
static const int64_t _max_time_into_past_default {60}; // s // TODO: increase for prod (4?)
// currently per type
constexpr static size_t _max_parallel_per_peer {2};
constexpr static bool _only_send_self_observed {true};
constexpr static int64_t _max_time_into_past_default {60*15}; // s
public: public:
NGCHS2Send( NGCHS2Send(