transfer stats
This commit is contained in:
parent
31253f5708
commit
dd04e6131a
@ -67,6 +67,9 @@ add_library(solanaceae_sha1_ngcft1
|
||||
./solanaceae/ngc_ft1_sha1/participation.hpp
|
||||
./solanaceae/ngc_ft1_sha1/participation.cpp
|
||||
|
||||
./solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp
|
||||
./solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp
|
||||
|
||||
./solanaceae/ngc_ft1_sha1/receiving_transfers.hpp
|
||||
./solanaceae/ngc_ft1_sha1/receiving_transfers.cpp
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include "./components.hpp"
|
||||
|
||||
std::vector<size_t> Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
|
||||
namespace Components {
|
||||
|
||||
std::vector<size_t> FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
|
||||
const auto it = chunk_hash_to_index.find(hash);
|
||||
if (it != chunk_hash_to_index.cend()) {
|
||||
return it->second;
|
||||
@ -9,7 +11,7 @@ std::vector<size_t> Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest
|
||||
}
|
||||
}
|
||||
|
||||
bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
|
||||
bool FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
|
||||
if (have_all) { // short cut
|
||||
return true;
|
||||
}
|
||||
@ -23,3 +25,17 @@ bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
void TransferStatsTally::Peer::trimSent(const float time_now) {
|
||||
while (recently_sent.size() > 4 && time_now - recently_sent.front().time_point > 1.f) {
|
||||
recently_sent.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
void TransferStatsTally::Peer::trimReceived(const float time_now) {
|
||||
while (recently_received.size() > 4 && time_now - recently_received.front().time_point > 1.f) {
|
||||
recently_received.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
} // Components
|
||||
|
||||
|
@ -116,11 +116,17 @@ namespace Components {
|
||||
struct TransferStatsTally {
|
||||
struct Peer {
|
||||
struct Entry {
|
||||
float time_since {0.f};
|
||||
float time_point {0.f};
|
||||
size_t bytes {0u};
|
||||
bool accounted {false};
|
||||
};
|
||||
std::deque<Entry> recently_sent;
|
||||
std::deque<Entry> recently_received;
|
||||
|
||||
// keep atleast 4 or 1sec
|
||||
// trim too old front
|
||||
void trimSent(const float time_now);
|
||||
void trimReceived(const float time_now);
|
||||
};
|
||||
entt::dense_map<Contact3, Peer> tally;
|
||||
};
|
||||
|
@ -23,6 +23,8 @@
|
||||
#include "./chunk_picker.hpp"
|
||||
#include "./participation.hpp"
|
||||
|
||||
#include "./transfer_stats_systems.hpp"
|
||||
|
||||
#include <iostream>
|
||||
#include <variant>
|
||||
#include <filesystem>
|
||||
@ -567,6 +569,9 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
}
|
||||
}
|
||||
|
||||
// transfer statistics systems
|
||||
Systems::transfer_tally_update(_os.registry(), getTimeNow());
|
||||
|
||||
if (_peer_open_requests.empty()) {
|
||||
return 2.f;
|
||||
} else {
|
||||
@ -938,6 +943,20 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
||||
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n";
|
||||
}
|
||||
}
|
||||
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (static_cast<bool>(c)) {
|
||||
o.get_or_emplace<Components::TransferStatsTally>()
|
||||
.tally[c]
|
||||
.recently_received
|
||||
.push_back(
|
||||
Components::TransferStatsTally::Peer::Entry{
|
||||
float(getTimeNow()),
|
||||
e.data_size
|
||||
}
|
||||
)
|
||||
;
|
||||
}
|
||||
} else {
|
||||
assert(false && "unhandled case");
|
||||
}
|
||||
@ -990,6 +1009,20 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
||||
//// was last read (probably TODO: add transfer destruction event)
|
||||
//peer.erase(e.transfer_id);
|
||||
//}
|
||||
|
||||
auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||
if (static_cast<bool>(c)) {
|
||||
chunk_transfer.content.get_or_emplace<Components::TransferStatsTally>()
|
||||
.tally[c]
|
||||
.recently_sent
|
||||
.push_back(
|
||||
Components::TransferStatsTally::Peer::Entry{
|
||||
float(getTimeNow()),
|
||||
data.size
|
||||
}
|
||||
)
|
||||
;
|
||||
}
|
||||
} else {
|
||||
assert(false && "not implemented?");
|
||||
}
|
||||
|
@ -35,6 +35,12 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
|
||||
std::minstd_rand _rng {1337*11};
|
||||
|
||||
using clock = std::chrono::steady_clock;
|
||||
clock::time_point _time_start_offset {clock::now()};
|
||||
float getTimeNow(void) const {
|
||||
return std::chrono::duration<float>{clock::now() - _time_start_offset}.count();
|
||||
}
|
||||
|
||||
// limit this to each group?
|
||||
entt::dense_map<SHA1Digest, ObjectHandle> _info_to_content;
|
||||
|
||||
|
112
solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp
Normal file
112
solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp
Normal file
@ -0,0 +1,112 @@
|
||||
#include "./transfer_stats_systems.hpp"
|
||||
|
||||
#include "./components.hpp"
|
||||
#include <iostream>
|
||||
|
||||
namespace Systems {
|
||||
|
||||
void transfer_tally_update(ObjectRegistry& os_reg, const float time_now) {
|
||||
std::vector<Object> tally_to_remove;
|
||||
// for each tally -> stats separated
|
||||
os_reg.view<Components::TransferStatsTally>().each([&os_reg, time_now, &tally_to_remove](const auto ov, Components::TransferStatsTally& tally_comp) {
|
||||
// for each peer
|
||||
std::vector<Contact3> to_remove;
|
||||
for (auto&& [peer_c, peer] : tally_comp.tally) {
|
||||
auto& tss = os_reg.get_or_emplace<Components::TransferStatsSeparated>(ov).stats;
|
||||
|
||||
// special logic
|
||||
// if newest older than 2sec
|
||||
// discard
|
||||
|
||||
if (!peer.recently_sent.empty() && time_now - peer.recently_sent.back().time_point >= 2.f) {
|
||||
// clean up stale
|
||||
auto peer_in_stats_it = tss.find(peer_c);
|
||||
if (peer_in_stats_it != tss.end()) {
|
||||
peer_in_stats_it->second.rate_up = 0.f;
|
||||
}
|
||||
|
||||
peer.recently_sent.clear();
|
||||
if (peer.recently_received.empty()) {
|
||||
to_remove.push_back(peer_c);
|
||||
}
|
||||
} else {
|
||||
// else trim too old front
|
||||
peer.trimSent(time_now);
|
||||
|
||||
size_t tally_bytes {0u};
|
||||
for (auto& [time, bytes, accounted] : peer.recently_sent) {
|
||||
if (!accounted) {
|
||||
tss[peer_c].total_up += bytes;
|
||||
accounted = true;
|
||||
}
|
||||
tally_bytes += bytes;
|
||||
}
|
||||
|
||||
tss[peer_c].rate_up = tally_bytes / (time_now - peer.recently_sent.front().time_point + 0.00001f);
|
||||
}
|
||||
|
||||
if (!peer.recently_received.empty() && time_now - peer.recently_received.back().time_point >= 2.f) {
|
||||
// clean up stale
|
||||
auto peer_in_stats_it = tss.find(peer_c);
|
||||
if (peer_in_stats_it != tss.end()) {
|
||||
peer_in_stats_it->second.rate_down = 0.f;
|
||||
}
|
||||
|
||||
peer.recently_received.clear();
|
||||
if (peer.recently_sent.empty()) {
|
||||
to_remove.push_back(peer_c);
|
||||
}
|
||||
} else {
|
||||
// else trim too old front
|
||||
peer.trimReceived(time_now);
|
||||
|
||||
size_t tally_bytes {0u};
|
||||
for (auto& [time, bytes, accounted] : peer.recently_received) {
|
||||
if (!accounted) {
|
||||
tss[peer_c].total_down += bytes;
|
||||
accounted = true;
|
||||
}
|
||||
tally_bytes += bytes;
|
||||
}
|
||||
|
||||
tss[peer_c].rate_down = tally_bytes / (time_now - peer.recently_received.front().time_point + 0.00001f);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto c : to_remove) {
|
||||
tally_comp.tally.erase(c);
|
||||
}
|
||||
|
||||
if (tally_comp.tally.empty()) {
|
||||
tally_to_remove.push_back(ov);
|
||||
}
|
||||
});
|
||||
|
||||
// for each stats separated -> stats (total)
|
||||
os_reg.view<Components::TransferStatsSeparated, Components::TransferStatsTally>().each([&os_reg](const auto ov, Components::TransferStatsSeparated& tss_comp, const auto&) {
|
||||
Components::TransferStats& stats = os_reg.get_or_emplace<Components::TransferStats>(ov);
|
||||
stats = {}; // reset
|
||||
|
||||
for (const auto& [_, peer_stats] : tss_comp.stats) {
|
||||
stats.rate_up += peer_stats.rate_up;
|
||||
stats.rate_down += peer_stats.rate_down;
|
||||
stats.total_up += peer_stats.total_up;
|
||||
stats.total_down += peer_stats.total_down;
|
||||
}
|
||||
|
||||
#if 0
|
||||
std::cout << "updated stats:\n"
|
||||
<< " rate u:" << stats.rate_up/1024 << "KiB/s d:" << stats.rate_down/1024 << "KiB/s\n"
|
||||
<< " total u:" << stats.total_up/1024 << "KiB d:" << stats.total_down/1024 << "KiB\n"
|
||||
;
|
||||
#endif
|
||||
});
|
||||
|
||||
|
||||
for (const auto ov : tally_to_remove) {
|
||||
os_reg.remove<Components::TransferStatsTally>(ov);
|
||||
}
|
||||
}
|
||||
|
||||
} // Systems
|
||||
|
11
solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp
Normal file
11
solanaceae/ngc_ft1_sha1/transfer_stats_systems.hpp
Normal file
@ -0,0 +1,11 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/object_store/object_store.hpp>
|
||||
|
||||
namespace Systems {
|
||||
|
||||
// time only needs to be relative
|
||||
void transfer_tally_update(ObjectRegistry& os_reg, const float time_now);
|
||||
|
||||
} // Systems
|
||||
|
Loading…
x
Reference in New Issue
Block a user