hs2 send done, but untested

This commit is contained in:
Green Sky 2024-12-06 22:41:05 +01:00
parent adeaca4efe
commit 294c5346ca
No known key found for this signature in database
2 changed files with 126 additions and 22 deletions

View File

@ -25,15 +25,15 @@
namespace Components {
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request) {
void IncommingTimeRangeRequestQueue::queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid) {
// TODO: do more than exact dedupe
for (const auto& [ts_start, ts_end] : _queue) {
if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) {
for (const auto& [time_range, _] : _queue) {
if (time_range.ts_start == new_request.ts_start && time_range.ts_end == new_request.ts_end) {
return; // already enqueued
}
}
_queue.push_back(new_request);
_queue.emplace_back(Entry{new_request, std::vector<uint8_t>{fid.cbegin(), fid.cend()}});
}
} // Components
@ -65,7 +65,7 @@ float NGCHS2Send::iterate(float delta) {
// limit how often we update here (new fts usually)
if (_iterate_heat > 0.f) {
_iterate_heat -= delta;
return 1000.f;
return 1000.f; // return heat?
} else {
_iterate_heat = _iterate_cooldown;
}
@ -84,20 +84,77 @@ float NGCHS2Send::iterate(float delta) {
continue;
}
// new ft here?
// new ft here
// TODO: loop? nah just 1 per tick is enough
const auto request_entry = iirq._queue.front(); // copy
assert(!request_entry.fid.empty());
if (!c.all_of<Contact::Components::Parent>()) {
iirq._queue.pop_front();
continue; // how
}
const Contact3Handle group_c = {*c.registry(), c.get<Contact::Components::Parent>().parent};
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
iirq._queue.pop_front();
continue;
}
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
// TODO: check allowed range here
//_max_time_into_past_default
// potentially heavy op
auto data = buildChatLogFileRange(group_c, request_entry.ir.ts_start, request_entry.ir.ts_end);
uint8_t transfer_id {0};
if (!_nft.NGC_FT1_send_init_private(
group_number, peer_number,
(uint32_t)NGCFT1_file_kind::HS2_RANGE_TIME_MSGPACK,
request_entry.fid.data(), request_entry.fid.size(),
data.size(),
&transfer_id,
true // can_compress (does nothing rn)
)) {
// sending failed, we do not pop but wait for next iterate
// TODO: cache data
// TODO: fail counter
// actually, fail probably means offline, so delete?
continue;
}
assert(iirr._list.count(transfer_id) == 0);
iirr._list[transfer_id] = {request_entry.ir, data};
iirq._queue.pop_front();
}
};
// first handle range requests on weak self
//for (auto&& [cv, iirq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingTimeRangeRequestQueue>().each()) {
fn_iirq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingTimeRangeRequestQueue>());
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue, Contact::Components::TagSelfWeak>());
// we could stop here, if too much is already running
// then range on others
//for (auto&& [cv, iirq] : _cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) {
fn_iirq(_cr.view<Components::IncommingTimeRangeRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}));
_cr.view<Components::IncommingTimeRangeRequestRunning>().each(
[delta](const auto cv, Components::IncommingTimeRangeRequestRunning& irr) {
std::vector<uint8_t> to_remove;
for (auto&& [ft_id, entry] : irr._list) {
entry.last_activity += delta;
if (entry.last_activity >= 60.f) {
to_remove.push_back(ft_id);
}
}
for (const auto it : to_remove) {
std::cout << "NGCHS2Send warning: timed out ." << (int)it << "\n";
// TODO: need a way to tell ft?
irr._list.erase(it);
// technically we are not supposed to timeout and instead rely on the done event
}
}
);
return 1000.f;
}
@ -150,10 +207,10 @@ void NGCHS2Send::handleTimeRange(Contact3Handle c, const Events::NGCFT1_recv_req
// dedupe insert into queue
// how much overlap do we allow?
c.get_or_emplace<Components::IncommingTimeRangeRequestQueue>().queueRequest({
ts_start,
ts_end,
});
c.get_or_emplace<Components::IncommingTimeRangeRequestQueue>().queueRequest(
{ts_start, ts_end},
fid
);
}
#if 0
@ -199,7 +256,7 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv
}
#endif
std::vector<uint8_t> NGCHS2Send::buildHSFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) {
std::vector<uint8_t> NGCHS2Send::buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end) {
const Message3Registry* reg_ptr = static_cast<const RegistryMessageModelI&>(_rmm).get(c);
if (reg_ptr == nullptr) {
return {};
@ -358,11 +415,52 @@ bool NGCHS2Send::onEvent(const Events::NGCFT1_recv_request& e) {
return true;
}
bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data&) {
return false;
bool NGCHS2Send::onEvent(const Events::NGCFT1_send_data& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false;
}
if (!c.all_of<Components::IncommingTimeRangeRequestRunning>()) {
return false;
}
auto& irr = c.get<Components::IncommingTimeRangeRequestRunning>();
if (!irr._list.count(e.transfer_id)) {
return false; // not for us (maybe)
}
auto& transfer = irr._list.at(e.transfer_id);
if (transfer.data.size() < e.data_offset+e.data_size) {
std::cerr << "NGCHS2Send error: ft send data larger then file???\n";
assert(false && "how");
}
std::memcpy(e.data, transfer.data.data()+e.data_offset, e.data_size);
transfer.last_activity = 0.f;
return true;
}
bool NGCHS2Send::onEvent(const Events::NGCFT1_send_done&) {
return false;
bool NGCHS2Send::onEvent(const Events::NGCFT1_send_done& e) {
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
if (!c) {
return false;
}
if (!c.all_of<Components::IncommingTimeRangeRequestRunning>()) {
return false;
}
auto& irr = c.get<Components::IncommingTimeRangeRequestRunning>();
if (!irr._list.count(e.transfer_id)) {
return false; // not for us (maybe)
}
irr._list.erase(e.transfer_id);
// TODO: check if we completed it
std::cout << "NGCHS2Send: sent chatlog to " << e.group_number << ":" << e.peer_number << "." << (int)e.transfer_id << "\n";
return true;
}

View File

@ -12,6 +12,7 @@
#include <solanaceae/util/span.hpp>
#include <vector>
#include <deque>
// fwd
class ToxContactModel2;
@ -25,17 +26,22 @@ struct TimeRangeRequest {
// TODO: move to own file
namespace Components {
struct IncommingTimeRangeRequestQueue {
std::vector<TimeRangeRequest> _queue;
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> fid;
};
std::deque<Entry> _queue;
// we should remove/notadd queued requests
// that are subsets of same or larger ranges
void queueRequest(const TimeRangeRequest& new_request);
void queueRequest(const TimeRangeRequest& new_request, const ByteSpan fid);
};
struct IncommingTimeRangeRequestRunning {
struct Entry {
TimeRangeRequest ir;
std::vector<uint8_t> data; // trasfer data in memory
std::vector<uint8_t> data; // transfer data in memory
float last_activity {0.f};
};
entt::dense_map<uint8_t, Entry> _list;
};
@ -81,7 +87,7 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI {
// msg reg contact
// time ranges
std::vector<uint8_t> buildHSFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end);
[[nodiscard]] std::vector<uint8_t> buildChatLogFileRange(Contact3Handle c, uint64_t ts_start, uint64_t ts_end);
protected:
bool onEvent(const Message::Events::MessageConstruct&) override;