small hs progress
This commit is contained in:
		| @@ -6,8 +6,35 @@ | |||||||
|  |  | ||||||
| #include <solanaceae/contact/components.hpp> | #include <solanaceae/contact/components.hpp> | ||||||
|  |  | ||||||
|  | #include <iostream> | ||||||
|  |  | ||||||
| // https://www.youtube.com/watch?v=AdAqsgga3qo | // https://www.youtube.com/watch?v=AdAqsgga3qo | ||||||
|  |  | ||||||
|  | namespace Components { | ||||||
|  |  | ||||||
|  | void IncommingInfoRequestQueue::queueRequest(const InfoRequest& new_request) { | ||||||
|  | 	// TODO: do more than exact dedupe | ||||||
|  | 	for (const auto& [ts_start, ts_end] : _queue) { | ||||||
|  | 		if (ts_start == new_request.ts_start && ts_end == new_request.ts_end) { | ||||||
|  | 			return; // already enqueued | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	_queue.push_back(new_request); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void IncommingMsgRequestQueue::queueRequest(const SingleMessageRequest& new_request) { | ||||||
|  | 	for (const auto& [ppk, mid, ts] : _queue) { | ||||||
|  | 		if (mid == new_request.mid && ts == new_request.ts && ppk == new_request.ppk) { | ||||||
|  | 			return; // already enqueued | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	_queue.push_back(new_request); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | } // Components | ||||||
|  |  | ||||||
|  |  | ||||||
| NGCHS2Send::NGCHS2Send( | NGCHS2Send::NGCHS2Send( | ||||||
| 	Contact3Registry& cr, | 	Contact3Registry& cr, | ||||||
| 	RegistryMessageModelI& rmm, | 	RegistryMessageModelI& rmm, | ||||||
| @@ -34,41 +61,152 @@ NGCHS2Send::~NGCHS2Send(void) { | |||||||
| } | } | ||||||
|  |  | ||||||
| float NGCHS2Send::iterate(float delta) { | float NGCHS2Send::iterate(float delta) { | ||||||
|  | 	// limit how often we update here (new fts usually) | ||||||
|  | 	if (_iterate_heat > 0.f) { | ||||||
|  | 		_iterate_heat -= delta; | ||||||
|  | 		return 1000.f; | ||||||
|  | 	} else { | ||||||
|  | 		_iterate_heat = _iterate_cooldown; | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// work request queue | 	// work request queue | ||||||
| 	// check if already running, discard | 	// check if already running, discard | ||||||
|  |  | ||||||
|  | 	auto fn_iirq = [this](auto&& view) { | ||||||
|  | 		for (auto&& [cv, iirq] : view.each()) { | ||||||
|  | 			Contact3Handle c{_cr, cv}; | ||||||
|  | 			auto& iirr = c.get_or_emplace<Components::IncommingInfoRequestRunning>(); | ||||||
|  |  | ||||||
|  | 			// dedup queued from running | ||||||
|  |  | ||||||
|  | 			if (iirr._list.size() >= _max_parallel_per_peer) { | ||||||
|  | 				continue; | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// new ft here? | ||||||
|  | 		} | ||||||
|  | 	}; | ||||||
|  |  | ||||||
|  | 	auto fn_imrq = [this](auto&& view) { | ||||||
|  | 		for (auto&& [cv, imrq] : view.each()) { | ||||||
|  | 			Contact3Handle c{_cr, cv}; | ||||||
|  | 			auto& imrr = c.get_or_emplace<Components::IncommingMsgRequestRunning>(); | ||||||
|  |  | ||||||
|  | 			// dedup queued from running | ||||||
|  |  | ||||||
|  | 			if (imrr._list.size() >= _max_parallel_per_peer) { | ||||||
|  | 				continue; | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// new ft here? | ||||||
|  | 		} | ||||||
|  | 	}; | ||||||
|  |  | ||||||
|  | 	// first handle range requests on weak self | ||||||
|  | 	//for (auto&& [cv, iirq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>().each()) { | ||||||
|  | 	fn_iirq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingInfoRequestQueue>()); | ||||||
|  |  | ||||||
|  | 	// then handle messages on weak self | ||||||
|  | 	//for (auto&& [cv, imrq] : _cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>().each()) { | ||||||
|  | 	fn_imrq(_cr.view<Contact::Components::TagSelfWeak, Components::IncommingMsgRequestQueue>()); | ||||||
|  |  | ||||||
|  | 	// we could stop here, if too much is already running | ||||||
|  |  | ||||||
|  | 	// then range on others | ||||||
|  | 	//for (auto&& [cv, iirq] : _cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) { | ||||||
|  | 	fn_iirq(_cr.view<Components::IncommingInfoRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{})); | ||||||
|  |  | ||||||
|  | 	// then messages on others | ||||||
|  | 	//for (auto&& [cv, imrq] : _cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{}).each()) { | ||||||
|  | 	fn_imrq(_cr.view<Components::IncommingMsgRequestQueue>(entt::exclude_t<Contact::Components::TagSelfWeak>{})); | ||||||
|  |  | ||||||
| 	return 1000.f; | 	return 1000.f; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | template<typename Type> | ||||||
|  | static uint64_t deserlSimpleType(ByteSpan bytes) { | ||||||
|  | 	if (bytes.size < sizeof(Type)) { | ||||||
|  | 		throw int(1); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	Type value; | ||||||
|  |  | ||||||
|  | 	for (size_t i = 0; i < sizeof(Type); i++) { | ||||||
|  | 		value |= Type(bytes[i]) << (i*8); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return value; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static uint32_t deserlMID(ByteSpan mid_bytes) { | ||||||
|  | 	return deserlSimpleType<uint32_t>(mid_bytes); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static uint64_t deserlTS(ByteSpan ts_bytes) { | ||||||
|  | 	return deserlSimpleType<uint64_t>(ts_bytes); | ||||||
|  | } | ||||||
|  |  | ||||||
| void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) { | void NGCHS2Send::handleRange(Contact3Handle c, const Events::NGCFT1_recv_request& e) { | ||||||
| 	ByteSpan fid{e.file_id, e.file_id_size}; | 	ByteSpan fid{e.file_id, e.file_id_size}; | ||||||
|  | 	// TODO: better size check | ||||||
|  | 	if (fid.size != sizeof(uint64_t)+sizeof(uint64_t)) { | ||||||
|  | 		std::cerr << "NGCHS2S error: range not lange enough\n"; | ||||||
|  | 		return; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// seconds | ||||||
|  | 	uint64_t ts_start{0}; | ||||||
|  | 	uint64_t ts_end{0}; | ||||||
|  |  | ||||||
| 	// parse | 	// parse | ||||||
| 	// - ts start | 	try { | ||||||
| 	// - ts end | 		ByteSpan ts_start_bytes{fid.ptr, sizeof(uint64_t)}; | ||||||
|  | 		ts_start = deserlTS(ts_start_bytes); | ||||||
|  |  | ||||||
|  | 		ByteSpan ts_end_bytes{ts_start_bytes.ptr+ts_start_bytes.size, sizeof(uint64_t)}; | ||||||
|  | 		ts_end = deserlTS(ts_end_bytes); | ||||||
|  | 	} catch (...) { | ||||||
|  | 		std::cerr << "NGCHS2S error: failed to parse range\n"; | ||||||
|  | 		return; | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// dedupe insert into queue | 	// dedupe insert into queue | ||||||
|  | 	// how much overlap do we allow? | ||||||
|  | 	c.get_or_emplace<Components::IncommingInfoRequestQueue>().queueRequest({ | ||||||
|  | 		ts_start, | ||||||
|  | 		ts_end, | ||||||
|  | 	}); | ||||||
| } | } | ||||||
|  |  | ||||||
| void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) { | void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv_request& e) { | ||||||
| 	ByteSpan fid{e.file_id, e.file_id_size}; | 	ByteSpan fid{e.file_id, e.file_id_size}; | ||||||
|  | 	// TODO: better size check | ||||||
| 	if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) { | 	if (fid.size != 32+sizeof(uint32_t)+sizeof(uint64_t)) { | ||||||
| 		// error | 		std::cerr << "NGCHS2S error: singlemessage not lange enough\n"; | ||||||
| 		return; | 		return; | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	ByteSpan ppk; | ||||||
|  | 	uint32_t mid {0}; | ||||||
|  | 	uint64_t ts {0}; // deciseconds | ||||||
|  |  | ||||||
| 	// parse | 	// parse | ||||||
|  | 	try { | ||||||
| 		// - ppk | 		// - ppk | ||||||
| 		// TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32) | 		// TOX_GROUP_PEER_PUBLIC_KEY_SIZE (32) | ||||||
| 	ByteSpan ppk{fid.ptr, 32}; | 		ppk = {fid.ptr, 32}; | ||||||
|  |  | ||||||
| 		// - mid | 		// - mid | ||||||
| 	//static_assert(sizeof(Tox_Group_Message_Id) == sizeof(uint32_t)); |  | ||||||
| 		ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)}; | 		ByteSpan mid_bytes{fid.ptr+ppk.size, sizeof(uint32_t)}; | ||||||
|  | 		mid = deserlMID(mid_bytes); | ||||||
|  |  | ||||||
| 		// - ts | 		// - ts | ||||||
| 	// uint64_t (seconds? we dont want milliseconds |  | ||||||
| 		ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)}; | 		ByteSpan ts_bytes{mid_bytes.ptr+mid_bytes.size, sizeof(uint64_t)}; | ||||||
|  | 		ts = deserlTS(ts_bytes); | ||||||
|  | 	} catch (...) { | ||||||
|  | 		std::cerr << "NGCHS2S error: failed to parse singlemessage\n"; | ||||||
|  | 		return; | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// file content | 	// file content | ||||||
| 	// - message type (text/textaction/file(ft1sha1)) | 	// - message type (text/textaction/file(ft1sha1)) | ||||||
| @@ -81,6 +219,11 @@ void NGCHS2Send::handleSingleMessage(Contact3Handle c, const Events::NGCFT1_recv | |||||||
| 	// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts | 	// for queue, we need group, peer, msg_ppk, msg_mid, msg_ts | ||||||
|  |  | ||||||
| 	// dedupe insert into queue | 	// dedupe insert into queue | ||||||
|  | 	c.get_or_emplace<Components::IncommingMsgRequestQueue>().queueRequest({ | ||||||
|  | 		ppk, | ||||||
|  | 		mid, | ||||||
|  | 		ts, | ||||||
|  | 	}); | ||||||
| } | } | ||||||
|  |  | ||||||
| bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) { | bool NGCHS2Send::onEvent(const Message::Events::MessageConstruct&) { | ||||||
|   | |||||||
| @@ -9,10 +9,60 @@ | |||||||
|  |  | ||||||
| #include <entt/container/dense_map.hpp> | #include <entt/container/dense_map.hpp> | ||||||
|  |  | ||||||
|  | #include <solanaceae/util/span.hpp> | ||||||
|  |  | ||||||
|  | #include <vector> | ||||||
|  |  | ||||||
| // fwd | // fwd | ||||||
| class ToxContactModel2; | class ToxContactModel2; | ||||||
|  |  | ||||||
| // limit to 2 uploads per peer simultaniously |  | ||||||
|  | struct InfoRequest { | ||||||
|  | 	uint64_t ts_start{0}; | ||||||
|  | 	uint64_t ts_end{0}; | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | struct SingleMessageRequest { | ||||||
|  | 	ByteSpan ppk; | ||||||
|  | 	uint32_t mid {0}; | ||||||
|  | 	uint64_t ts {0}; // deciseconds | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | // TODO: move to own file | ||||||
|  | namespace Components { | ||||||
|  | 	struct IncommingInfoRequestQueue { | ||||||
|  | 		std::vector<InfoRequest> _queue; | ||||||
|  |  | ||||||
|  | 		// we should remove/notadd queued requests | ||||||
|  | 		// that are subsets of same or larger ranges | ||||||
|  | 		void queueRequest(const InfoRequest& new_request); | ||||||
|  | 	}; | ||||||
|  |  | ||||||
|  | 	struct IncommingInfoRequestRunning { | ||||||
|  | 		struct Entry { | ||||||
|  | 			InfoRequest ir; | ||||||
|  | 			std::vector<uint8_t> data; // trasfer data in memory | ||||||
|  | 		}; | ||||||
|  | 		entt::dense_map<uint8_t, Entry> _list; | ||||||
|  | 	}; | ||||||
|  |  | ||||||
|  | 	struct IncommingMsgRequestQueue { | ||||||
|  | 		// optimize dup lookups (this list could be large) | ||||||
|  | 		std::vector<SingleMessageRequest> _queue; | ||||||
|  |  | ||||||
|  | 		// removes dups | ||||||
|  | 		void queueRequest(const SingleMessageRequest& new_request); | ||||||
|  | 	}; | ||||||
|  |  | ||||||
|  | 	struct IncommingMsgRequestRunning { | ||||||
|  | 		struct Entry { | ||||||
|  | 			SingleMessageRequest smr; | ||||||
|  | 			std::vector<uint8_t> data; // trasfer data in memory | ||||||
|  | 		}; | ||||||
|  | 		// make more efficent? this list is very short | ||||||
|  | 		entt::dense_map<uint8_t, Entry> _list; | ||||||
|  | 	}; | ||||||
|  | } // Components | ||||||
|  |  | ||||||
| class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { | class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { | ||||||
| 	Contact3Registry& _cr; | 	Contact3Registry& _cr; | ||||||
| @@ -21,12 +71,22 @@ class NGCHS2Send : public RegistryMessageModelEventI, public NGCFT1EventI { | |||||||
| 	NGCFT1& _nft; | 	NGCFT1& _nft; | ||||||
| 	NGCFT1EventProviderI::SubscriptionReference _nftep_sr; | 	NGCFT1EventProviderI::SubscriptionReference _nftep_sr; | ||||||
|  |  | ||||||
|  | 	float _iterate_heat {0.f}; | ||||||
|  | 	constexpr static float _iterate_cooldown {1.22f}; // sec | ||||||
|  |  | ||||||
| 	// open/running info requests (by c) | 	// open/running info requests (by c) | ||||||
|  | 	// comp on peer c | ||||||
|  |  | ||||||
| 	// open/running info responses (by c) | 	// open/running info responses (by c) | ||||||
|  | 	// comp on peer c | ||||||
|  |  | ||||||
| 	static const bool _only_send_self_observed {true}; | 	// limit to 2 uploads per peer simultaniously | ||||||
| 	static const int64_t _max_time_into_past_default {60}; // s | 	// TODO: increase for prod (4?) | ||||||
|  | 	// currently per type | ||||||
|  | 	constexpr static size_t _max_parallel_per_peer {2}; | ||||||
|  |  | ||||||
|  | 	constexpr static bool _only_send_self_observed {true}; | ||||||
|  | 	constexpr static int64_t _max_time_into_past_default {60*15}; // s | ||||||
|  |  | ||||||
| 	public: | 	public: | ||||||
| 		NGCHS2Send( | 		NGCHS2Send( | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user