From dd04e6131af0d1bca7db0132adca65ce9b5d458d Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 13 Jul 2024 11:46:33 +0200 Subject: [PATCH] transfer stats --- CMakeLists.txt | 3 + solanaceae/ngc_ft1_sha1/components.cpp | 20 +++- solanaceae/ngc_ft1_sha1/components.hpp | 8 +- solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 33 ++++++ solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp | 6 + .../ngc_ft1_sha1/transfer_stats_systems.cpp | 112 ++++++++++++++++++ .../ngc_ft1_sha1/transfer_stats_systems.hpp | 11 ++ 7 files changed, 190 insertions(+), 3 deletions(-) create mode 100644 solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp create mode 100644 solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ff7b0d..b4b8a0b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -67,6 +67,9 @@ add_library(solanaceae_sha1_ngcft1 ./solanaceae/ngc_ft1_sha1/participation.hpp ./solanaceae/ngc_ft1_sha1/participation.cpp + ./solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp + ./solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp + ./solanaceae/ngc_ft1_sha1/receiving_transfers.hpp ./solanaceae/ngc_ft1_sha1/receiving_transfers.cpp diff --git a/solanaceae/ngc_ft1_sha1/components.cpp b/solanaceae/ngc_ft1_sha1/components.cpp index 5fbf038..ba31d3a 100644 --- a/solanaceae/ngc_ft1_sha1/components.cpp +++ b/solanaceae/ngc_ft1_sha1/components.cpp @@ -1,6 +1,8 @@ #include "./components.hpp" -std::vector Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const { +namespace Components { + +std::vector FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const { const auto it = chunk_hash_to_index.find(hash); if (it != chunk_hash_to_index.cend()) { return it->second; @@ -9,7 +11,7 @@ std::vector Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest } } -bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { +bool FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { if (have_all) { // short cut return true; } @@ -23,3 +25,17 @@ bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { return false; } +void TransferStatsTally::Peer::trimSent(const float time_now) { + while (recently_sent.size() > 4 && time_now - recently_sent.front().time_point > 1.f) { + recently_sent.pop_front(); + } +} + +void TransferStatsTally::Peer::trimReceived(const float time_now) { + while (recently_received.size() > 4 && time_now - recently_received.front().time_point > 1.f) { + recently_received.pop_front(); + } +} + +} // Components + diff --git a/solanaceae/ngc_ft1_sha1/components.hpp b/solanaceae/ngc_ft1_sha1/components.hpp index d428791..ca81241 100644 --- a/solanaceae/ngc_ft1_sha1/components.hpp +++ b/solanaceae/ngc_ft1_sha1/components.hpp @@ -116,11 +116,17 @@ namespace Components { struct TransferStatsTally { struct Peer { struct Entry { - float time_since {0.f}; + float time_point {0.f}; size_t bytes {0u}; + bool accounted {false}; }; std::deque recently_sent; std::deque recently_received; + + // keep atleast 4 or 1sec + // trim too old front + void trimSent(const float time_now); + void trimReceived(const float time_now); }; entt::dense_map tally; }; diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index 1f2e5db..e032e6e 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -23,6 +23,8 @@ #include "./chunk_picker.hpp" #include "./participation.hpp" +#include "./transfer_stats_systems.hpp" + #include #include #include @@ -567,6 +569,9 @@ float SHA1_NGCFT1::iterate(float delta) { } } + // transfer statistics systems + Systems::transfer_tally_update(_os.registry(), getTimeNow()); + if (_peer_open_requests.empty()) { return 2.f; } else { @@ -938,6 +943,20 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n"; } } + + auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + if (static_cast(c)) { + o.get_or_emplace() + .tally[c] + .recently_received + .push_back( + Components::TransferStatsTally::Peer::Entry{ + float(getTimeNow()), + e.data_size + } + ) + ; + } } else { assert(false && "unhandled case"); } @@ -990,6 +1009,20 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { //// was last read (probably TODO: add transfer destruction event) //peer.erase(e.transfer_id); //} + + auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + if (static_cast(c)) { + chunk_transfer.content.get_or_emplace() + .tally[c] + .recently_sent + .push_back( + Components::TransferStatsTally::Peer::Entry{ + float(getTimeNow()), + data.size + } + ) + ; + } } else { assert(false && "not implemented?"); } diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index e1c4ad4..0878135 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -35,6 +35,12 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public std::minstd_rand _rng {1337*11}; + using clock = std::chrono::steady_clock; + clock::time_point _time_start_offset {clock::now()}; + float getTimeNow(void) const { + return std::chrono::duration{clock::now() - _time_start_offset}.count(); + } + // limit this to each group? entt::dense_map _info_to_content; diff --git a/solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp b/solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp new file mode 100644 index 0000000..ff1454b --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp @@ -0,0 +1,112 @@ +#include "./transfer_stats_systems.hpp" + +#include "./components.hpp" +#include + +namespace Systems { + +void transfer_tally_update(ObjectRegistry& os_reg, const float time_now) { + std::vector tally_to_remove; + // for each tally -> stats separated + os_reg.view().each([&os_reg, time_now, &tally_to_remove](const auto ov, Components::TransferStatsTally& tally_comp) { + // for each peer + std::vector to_remove; + for (auto&& [peer_c, peer] : tally_comp.tally) { + auto& tss = os_reg.get_or_emplace(ov).stats; + + // special logic + // if newest older than 2sec + // discard + + if (!peer.recently_sent.empty() && time_now - peer.recently_sent.back().time_point >= 2.f) { + // clean up stale + auto peer_in_stats_it = tss.find(peer_c); + if (peer_in_stats_it != tss.end()) { + peer_in_stats_it->second.rate_up = 0.f; + } + + peer.recently_sent.clear(); + if (peer.recently_received.empty()) { + to_remove.push_back(peer_c); + } + } else { + // else trim too old front + peer.trimSent(time_now); + + size_t tally_bytes {0u}; + for (auto& [time, bytes, accounted] : peer.recently_sent) { + if (!accounted) { + tss[peer_c].total_up += bytes; + accounted = true; + } + tally_bytes += bytes; + } + + tss[peer_c].rate_up = tally_bytes / (time_now - peer.recently_sent.front().time_point + 0.00001f); + } + + if (!peer.recently_received.empty() && time_now - peer.recently_received.back().time_point >= 2.f) { + // clean up stale + auto peer_in_stats_it = tss.find(peer_c); + if (peer_in_stats_it != tss.end()) { + peer_in_stats_it->second.rate_down = 0.f; + } + + peer.recently_received.clear(); + if (peer.recently_sent.empty()) { + to_remove.push_back(peer_c); + } + } else { + // else trim too old front + peer.trimReceived(time_now); + + size_t tally_bytes {0u}; + for (auto& [time, bytes, accounted] : peer.recently_received) { + if (!accounted) { + tss[peer_c].total_down += bytes; + accounted = true; + } + tally_bytes += bytes; + } + + tss[peer_c].rate_down = tally_bytes / (time_now - peer.recently_received.front().time_point + 0.00001f); + } + } + + for (const auto c : to_remove) { + tally_comp.tally.erase(c); + } + + if (tally_comp.tally.empty()) { + tally_to_remove.push_back(ov); + } + }); + + // for each stats separated -> stats (total) + os_reg.view().each([&os_reg](const auto ov, Components::TransferStatsSeparated& tss_comp, const auto&) { + Components::TransferStats& stats = os_reg.get_or_emplace(ov); + stats = {}; // reset + + for (const auto& [_, peer_stats] : tss_comp.stats) { + stats.rate_up += peer_stats.rate_up; + stats.rate_down += peer_stats.rate_down; + stats.total_up += peer_stats.total_up; + stats.total_down += peer_stats.total_down; + } + +#if 0 + std::cout << "updated stats:\n" + << " rate u:" << stats.rate_up/1024 << "KiB/s d:" << stats.rate_down/1024 << "KiB/s\n" + << " total u:" << stats.total_up/1024 << "KiB d:" << stats.total_down/1024 << "KiB\n" + ; +#endif + }); + + + for (const auto ov : tally_to_remove) { + os_reg.remove(ov); + } +} + +} // Systems + diff --git a/solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp b/solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp new file mode 100644 index 0000000..3f72bb8 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include + +namespace Systems { + +// time only needs to be relative +void transfer_tally_update(ObjectRegistry& os_reg, const float time_now); + +} // Systems +