Compare commits
2 Commits
37b92f67c8
...
34dc01d4dc
Author | SHA1 | Date | |
---|---|---|---|
|
34dc01d4dc | ||
|
1bf1fbce75 |
@ -133,6 +133,7 @@ void ChunkPicker::updateParticipation(
|
|||||||
|
|
||||||
entt::dense_set<Object> checked;
|
entt::dense_set<Object> checked;
|
||||||
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
|
for (const Object ov : c.get<Contact::Components::FT1Participation>().participating) {
|
||||||
|
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
|
||||||
const ObjectHandle o {objreg, ov};
|
const ObjectHandle o {objreg, ov};
|
||||||
|
|
||||||
if (participating_unfinished.contains(o)) {
|
if (participating_unfinished.contains(o)) {
|
||||||
@ -150,6 +151,21 @@ void ChunkPicker::updateParticipation(
|
|||||||
participating_unfinished.erase(o);
|
participating_unfinished.erase(o);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: optimize this to only change on dirty, or something
|
||||||
|
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
|
||||||
|
Priority prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
|
||||||
|
|
||||||
|
uint16_t pskips =
|
||||||
|
prio == Priority::HIGHEST ? 0u :
|
||||||
|
prio == Priority::HIGH ? 1u :
|
||||||
|
prio == Priority::NORMAL ? 2u :
|
||||||
|
prio == Priority::LOW ? 4u :
|
||||||
|
8u // LOWEST
|
||||||
|
;
|
||||||
|
|
||||||
|
participating_unfinished.at(o).should_skip = pskips;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
|
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
|
||||||
continue;
|
continue;
|
||||||
@ -160,8 +176,6 @@ void ChunkPicker::updateParticipation(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
|
if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
|
||||||
//using Priority = Components::DownloadPriority::Priority;
|
|
||||||
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
|
|
||||||
Priority prio = Priority::NORMAL;
|
Priority prio = Priority::NORMAL;
|
||||||
|
|
||||||
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
|
if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
|
||||||
@ -239,13 +253,12 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
|||||||
// round robin content (remember last obj)
|
// round robin content (remember last obj)
|
||||||
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
|
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
|
||||||
participating_in_last = participating_unfinished.begin()->first;
|
participating_in_last = participating_unfinished.begin()->first;
|
||||||
//participating_in_last = *participating_unfinished.begin();
|
|
||||||
}
|
}
|
||||||
assert(objreg.valid(participating_in_last));
|
assert(objreg.valid(participating_in_last));
|
||||||
|
|
||||||
auto it = participating_unfinished.find(participating_in_last);
|
auto it = participating_unfinished.find(participating_in_last);
|
||||||
// hard limit robin rounds to array size time 20
|
// hard limit robin rounds to array size time 20
|
||||||
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++) {
|
for (size_t i = 0; req_ret.size() < num_requests && i < participating_unfinished.size()*20; i++, it++) {
|
||||||
if (it == participating_unfinished.end()) {
|
if (it == participating_unfinished.end()) {
|
||||||
it = participating_unfinished.begin();
|
it = participating_unfinished.begin();
|
||||||
}
|
}
|
||||||
@ -254,6 +267,7 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
|||||||
it->second.skips++;
|
it->second.skips++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
it->second.skips = 0;
|
||||||
|
|
||||||
ObjectHandle o {objreg, it->first};
|
ObjectHandle o {objreg, it->first};
|
||||||
|
|
||||||
@ -361,7 +375,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
|
//if (it == participating_unfinished.end() || ++it == participating_unfinished.end()) {
|
||||||
|
if (it == participating_unfinished.end()) {
|
||||||
participating_in_last = entt::null;
|
participating_in_last = entt::null;
|
||||||
} else {
|
} else {
|
||||||
participating_in_last = it->first;
|
participating_in_last = it->first;
|
||||||
|
@ -6,8 +6,35 @@
|
|||||||
|
|
||||||
#include <solanaceae/contact/components.hpp>
|
#include <solanaceae/contact/components.hpp>
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
// https://www.youtube.com/watch?v=AdAqsgga3qo
|
// https://www.youtube.com/watch?v=AdAqsgga3qo
|
||||||
|
|
||||||
|
namespace Components {
|
||||||
|
|
||||||
|
void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) {
|
||||||
|
// TODO: do more than exact dedupe
|
||||||
|
for (const auto& [ts_start, ts_end] : _queue) {
|
||||||
|
if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) {
|
||||||
|
return; // already enqueued
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_queue.push_back(new_request);
|
||||||
|
}
|
||||||
|
|
||||||
|
void IncommingMsgRequestQueue::queueRequest(const SingleMessageRequest& new_request) {
|
||||||
|
for (const auto& [ppk, mid, ts] : _queue) {
|
||||||
|
if (mid == new_request.mid && ts == new_request.ts && ppk == new_request.ppk) {
|
||||||
|
return; // already enqueued
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_queue.push_back(new_request);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // Components
|
||||||
|
|
||||||
|
|
||||||
NGCHS2Send::NGCHS2Send(
|
NGCHS2Send::NGCHS2Send(
|
||||||
Contact3Registry& cr,
|
Contact3Registry& cr,
|
||||||
RegistryMessageModelI& rmm,
|
RegistryMessageModelI& rmm,
|
||||||
@ -34,41 +61,152 @@ NGCHS2Send::~NGCHS2Send(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
float NGCHS2Send::iterate(float delta) {
|
float NGCHS2Send::iterate(float delta) {
|
||||||
|
// limit how often we update here (new fts usually)
|
||||||
|
if (_iterate_heat > 0.f) {
|
||||||
|
_iterate_heat -= delta;
|
||||||
|
return 1000.f;
|
||||||
|
} else {
|
||||||
|
_iterate_heat = _iterate_cooldown;
|
||||||
|
}
|
||||||
|
|
||||||
// work request queue
|
// work request queue
|
||||||
// check if already running, discard
|
// check if already running, discard
|
||||||
|
|
||||||
|
auto fn_iirq = [this](auto&& view) {
|
||||||
|
for (auto&& [cv, iirq] : view.each()) {
|
||||||
|
Contact3Handle c{_cr, cv};
|
||||||
|
auto& iirr = c.get_or_emplace<Components::IncommingInfoRequestRunning>();
|
||||||
|
|
||||||
|
// dedup queued from running
|
||||||
|
|
||||||
|
if (iirr._list.size() >= _max_parallel_per_peer) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// new ft here?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto fn_imrq = [this](auto&& view) {
|
||||||
|
for (auto&& [cv, imrq] : view.each()) {
|
||||||
|
Contact3Handle c{_cr, cv};
|
||||||
|
auto& imrr = c.get_or_emplace<Components::IncommingMsgRequestRunning>();
|
||||||
|
|
||||||
|
// dedup queued from running
|
||||||
|
|
||||||
|
if (imrr._list.size() >= _max_parallel_per_peer) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// new ft here?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// first handle range requests on weak self
|
||||||
|
//for (auto&& [cv, iirq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>().each()) {
|
||||||
|
fn_iirq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>());
|
||||||
|
|
||||||
|
// then handle messages on weak self
|
||||||
|
//for (auto&& [cv, imrq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>().each()) {
|
||||||
|
fn_imrq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>());
|
||||||
|
|
||||||
|
// we could stop here, if too much is already running
|
||||||
|
|
||||||
|
// then range on others
|
||||||
|
//for (auto&& [cv, iirq] : _cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
|
||||||
|
fn_iirq(_cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
|
||||||
|
|
||||||
|
// then messages on others
|
||||||
|
//for (auto&& [cv, imrq] : _cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
|
||||||
|
fn_imrq(_cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
|
||||||
|
|
||||||
return 1000.f;
|
return 1000.f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<typename Type>
|
||||||
|
static uint64_t deserlSimpleType(ByteSpan bytes) {
|
||||||
|
if (bytes.size < sizeof(Type)) {
|
||||||
|
throw int(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
Type value;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < sizeof(Type); i++) {
|
||||||
|
value |= Type(bytes[i]) << (i*8);
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
static uint32_t deserlMID(ByteSpan mid_bytes) {
|
||||||
|
return deserlSimpleType<uint32_t>(mid_bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
static uint64_t deserlTS(ByteSpan ts_bytes) {
|
||||||
|
return deserlSimpleType<uint64_t>(ts_bytes);
|
||||||
|
}
|
||||||
|
|
||||||
void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
|
void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
|
||||||
ByteSpan fid{e.file_id, e.file_id_size};
|
ByteSpan fid{e.file_id, e.file_id_size};
|
||||||
|
// TODO: better size check
|
||||||
|
if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) {
|
||||||
|
std::cerr << "NGCHS2S error: range not lange enough\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// seconds
|
||||||
|
uint64_t ts_start{0};
|
||||||
|
uint64_t ts_end{0};
|
||||||
|
|
||||||
// parse
|
// parse
|
||||||
// - ts start
|
try {
|
||||||
// - ts end
|
ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)};
|
||||||
|
ts_start = deserlTS(ts_start_bytes);
|
||||||
|
|
||||||
|
ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)};
|
||||||
|
ts_end = deserlTS(ts_end_bytes);
|
||||||
|
} catch (...) {
|
||||||
|
std::cerr << "NGCHS2S error: failed to parse range\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// dedupe insert into queue
|
// dedupe insert into queue
|
||||||
|
// how much overlap do we allow?
|
||||||
|
c.get_or_emplace<Components::IncommingInfoRequestQueue>().queueRequest({
|
||||||
|
ts_start,
|
||||||
|
ts_end,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
|
void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) {
|
||||||
ByteSpan fid{e.file_id, e.file_id_size};
|
ByteSpan fid{e.file_id, e.file_id_size};
|
||||||
|
// TODO: better size check
|
||||||
if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) {
|
if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) {
|
||||||
// error
|
std::cerr << "NGCHS2S error: singlemessage not lange enough\n";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ByteSpan ppk;
|
||||||
|
uint32_t mid {0};
|
||||||
|
uint64_t ts {0}; // deciseconds
|
||||||
|
|
||||||
// parse
|
// parse
|
||||||
|
try {
|
||||||
// - ppk
|
// - ppk
|
||||||
// TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32)
|
// TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32)
|
||||||
ByteSpan ppk{fid.ptr, 32};
|
ppk = {fid.ptr, 32};
|
||||||
|
|
||||||
// - mid
|
// - mid
|
||||||
//static_assert(sizeof(Tox_Group_Message_Id) == sizeof(uint32_t));
|
|
||||||
ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)};
|
ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)};
|
||||||
|
mid = deserlMID(mid_bytes);
|
||||||
|
|
||||||
// - ts
|
// - ts
|
||||||
// uint64_t (seconds? we dont want milliseconds
|
|
||||||
ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)};
|
ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)};
|
||||||
|
ts = deserlTS(ts_bytes);
|
||||||
|
} catch (...) {
|
||||||
|
std::cerr << "NGCHS2S error: failed to parse singlemessage\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// file content
|
// file content
|
||||||
// - message type (text/textaction/file(ft1sha1))
|
// - message type (text/textaction/file(ft1sha1))
|
||||||
@ -81,6 +219,11 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv
|
|||||||
// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts
|
// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts
|
||||||
|
|
||||||
// dedupe insert into queue
|
// dedupe insert into queue
|
||||||
|
c.get_or_emplace<Components::IncommingMsgRequestQueue>().queueRequest({
|
||||||
|
ppk,
|
||||||
|
mid,
|
||||||
|
ts,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) {
|
bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) {
|
||||||
|
@ -9,10 +9,60 @@
|
|||||||
|
|
||||||
#include <entt/container/dense_map.hpp>
|
#include <entt/container/dense_map.hpp>
|
||||||
|
|
||||||
|
#include <solanaceae/util/span.hpp>
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
// fwd
|
// fwd
|
||||||
class ToxContactModel2;
|
class ToxContactModel2;
|
||||||
|
|
||||||
// limit to 2 uploads per peer simultaniously
|
|
||||||
|
struct InfoRequest {
|
||||||
|
uint64_t ts_start{0};
|
||||||
|
uint64_t ts_end{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
struct SingleMessageRequest {
|
||||||
|
ByteSpan ppk;
|
||||||
|
uint32_t mid {0};
|
||||||
|
uint64_t ts {0}; // deciseconds
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: move to own file
|
||||||
|
namespace Components {
|
||||||
|
struct IncommingInfoRequestQueue {
|
||||||
|
std::vector<InfoRequest> _queue;
|
||||||
|
|
||||||
|
// we should remove/notadd queued requests
|
||||||
|
// that are subsets of same or larger ranges
|
||||||
|
void queueRequest(const InfoRequest& new_request);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct IncommingInfoRequestRunning {
|
||||||
|
struct Entry {
|
||||||
|
InfoRequest ir;
|
||||||
|
std::vector<uint8_t> data; // trasfer data in memory
|
||||||
|
};
|
||||||
|
entt::dense_map<uint8_t, Entry> _list;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct IncommingMsgRequestQueue {
|
||||||
|
// optimize dup lookups (this list could be large)
|
||||||
|
std::vector<SingleMessageRequest> _queue;
|
||||||
|
|
||||||
|
// removes dups
|
||||||
|
void queueRequest(const SingleMessageRequest& new_request);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct IncommingMsgRequestRunning {
|
||||||
|
struct Entry {
|
||||||
|
SingleMessageRequest smr;
|
||||||
|
std::vector<uint8_t> data; // trasfer data in memory
|
||||||
|
};
|
||||||
|
// make more efficent? this list is very short
|
||||||
|
entt::dense_map<uint8_t, Entry> _list;
|
||||||
|
};
|
||||||
|
} // Components
|
||||||
|
|
||||||
class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
|
class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
|
||||||
Contact3Registry& _cr;
|
Contact3Registry& _cr;
|
||||||
@ -21,12 +71,22 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
NGCFT1& _nft;
|
NGCFT1& _nft;
|
||||||
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
NGCFT1EventProviderI::SubscriptionReference _nftep_sr;
|
||||||
|
|
||||||
|
float _iterate_heat {0.f};
|
||||||
|
constexpr static float _iterate_cooldown {1.22f}; // sec
|
||||||
|
|
||||||
// open/running info requests (by c)
|
// open/running info requests (by c)
|
||||||
|
// comp on peer c
|
||||||
|
|
||||||
// open/running info responses (by c)
|
// open/running info responses (by c)
|
||||||
|
// comp on peer c
|
||||||
|
|
||||||
static const bool _only_send_self_observed {true};
|
// limit to 2 uploads per peer simultaniously
|
||||||
static const int64_t _max_time_into_past_default {60}; // s
|
// TODO: increase for prod (4?)
|
||||||
|
// currently per type
|
||||||
|
constexpr static size_t _max_parallel_per_peer {2};
|
||||||
|
|
||||||
|
constexpr static bool _only_send_self_observed {true};
|
||||||
|
constexpr static int64_t _max_time_into_past_default {60*15}; // s
|
||||||
|
|
||||||
public:
|
public:
|
||||||
NGCHS2Send(
|
NGCHS2Send(
|
||||||
|
Loading…
Reference in New Issue
Block a user