From f5f7f2ca9d7044ca19ea77c91cc6c9537d00ef8c Mon Sep 17 00:00:00 2001 From: Green Sky Date: Fri, 14 Mar 2025 13:50:30 +0100 Subject: [PATCH] close files of inactive tranfsers --- CMakeLists.txt | 3 ++ .../backends/sha1_mapped_filesystem.cpp | 12 +++---- .../backends/sha1_mapped_filesystem.hpp | 2 +- solanaceae/ngc_ft1_sha1/components.hpp | 3 ++ .../ngc_ft1_sha1/file_inactivity_system.cpp | 33 ++++++++++++++++++ .../ngc_ft1_sha1/file_inactivity_system.hpp | 13 +++++++ solanaceae/ngc_ft1_sha1/sending_transfers.cpp | 6 ++-- solanaceae/ngc_ft1_sha1/sending_transfers.hpp | 2 +- solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 34 +++++++++++-------- solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp | 2 ++ 10 files changed, 85 insertions(+), 25 deletions(-) create mode 100644 solanaceae/ngc_ft1_sha1/file_inactivity_system.cpp create mode 100644 solanaceae/ngc_ft1_sha1/file_inactivity_system.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 61a8a4f..d41dfdf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -84,6 +84,9 @@ add_library(solanaceae_sha1_ngcft1 ./solanaceae/ngc_ft1_sha1/participation.hpp ./solanaceae/ngc_ft1_sha1/participation.cpp + ./solanaceae/ngc_ft1_sha1/file_inactivity_system.hpp + ./solanaceae/ngc_ft1_sha1/file_inactivity_system.cpp + ./solanaceae/ngc_ft1_sha1/re_announce_systems.hpp ./solanaceae/ngc_ft1_sha1/re_announce_systems.cpp diff --git a/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp b/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp index f354664..9967f68 100644 --- a/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp +++ b/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp @@ -22,7 +22,7 @@ namespace Backends { struct SHA1MappedFilesystem_InfoBuilderState { std::atomic_bool info_builder_dirty {false}; std::mutex info_builder_queue_mutex; - using InfoBuilderEntry = std::function; + using InfoBuilderEntry = std::function; std::list info_builder_queue; }; @@ -34,13 +34,13 @@ SHA1MappedFilesystem::SHA1MappedFilesystem( SHA1MappedFilesystem::~SHA1MappedFilesystem(void) { } -void SHA1MappedFilesystem::tick(void) { +void SHA1MappedFilesystem::tick(float current_time) { if (_ibs->info_builder_dirty) { std::lock_guard l{_ibs->info_builder_queue_mutex}; _ibs->info_builder_dirty = false; // set while holding lock for (auto& it : _ibs->info_builder_queue) { - it(); + it(current_time); } _ibs->info_builder_queue.clear(); } @@ -72,7 +72,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v if (!file_impl->isGood()) { { std::lock_guard l{ibs->info_builder_queue_mutex}; - ibs->info_builder_queue.push_back([file_path_](){ + ibs->info_builder_queue.push_back([file_path_](float){ // back on iterate thread std::cerr << "SHA1MF error: failed opening file '" << file_path_ << "'!\n"; @@ -120,7 +120,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v file_path_, sha1_info = std::move(sha1_info), cb = std::move(cb) - ]() mutable { // + ](float current_time) mutable { // // executed on iterate thread // reopen, cant move, since std::function needs to be copy consturctable (meh) @@ -196,7 +196,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v o.emplace_or_replace(file_path_); // ? } - o.emplace_or_replace(std::move(file_impl)); + o.emplace_or_replace(std::move(file_impl), current_time); if (!o.all_of()) { o.emplace(); diff --git a/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp b/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp index 67cc975..6b96390 100644 --- a/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp +++ b/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp @@ -22,7 +22,7 @@ struct SHA1MappedFilesystem : public StorageBackendIMeta, public StorageBackendI // pull from info builder queue // call from main thread (os thread?) often - void tick(void); + void tick(float current_time); ObjectHandle newObject(ByteSpan id, bool throw_construct = true) override; diff --git a/solanaceae/ngc_ft1_sha1/components.hpp b/solanaceae/ngc_ft1_sha1/components.hpp index 2850be0..f51f5ec 100644 --- a/solanaceae/ngc_ft1_sha1/components.hpp +++ b/solanaceae/ngc_ft1_sha1/components.hpp @@ -54,6 +54,9 @@ namespace Components { // the cached file2 for faster access // should be destroyed when no activity and recreated on demand std::unique_ptr file; + + // set to current time on init, read, write + float last_activity_ts {0.f}; }; struct FT1ChunkSHA1Requested { diff --git a/solanaceae/ngc_ft1_sha1/file_inactivity_system.cpp b/solanaceae/ngc_ft1_sha1/file_inactivity_system.cpp new file mode 100644 index 0000000..85b62cc --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/file_inactivity_system.cpp @@ -0,0 +1,33 @@ +#include "./file_inactivity_system.hpp" + +#include +#include + +#include "./components.hpp" + +#include + +namespace Systems { + +void file_inactivity( + ObjectRegistry& os_reg, + float current_time +) { + std::vector to_close; + size_t total {0}; + os_reg.view().each([&os_reg, &to_close, &total, current_time](Object ov, const Components::FT1File2& ft_f) { + if (current_time - ft_f.last_activity_ts >= 30.f) { + // after 30sec of inactivity + to_close.push_back(ov); + } + total++; + }); + + if (!to_close.empty()) { + std::cout << "SHA1_NGCFT1: closing " << to_close.size() << " out of " << total << " open files\n"; + os_reg.remove(to_close.cbegin(), to_close.cend()); + } +} + +} // Systems + diff --git a/solanaceae/ngc_ft1_sha1/file_inactivity_system.hpp b/solanaceae/ngc_ft1_sha1/file_inactivity_system.hpp new file mode 100644 index 0000000..b967d62 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/file_inactivity_system.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace Systems { + +void file_inactivity( + ObjectRegistry& os_reg, + float current_time +); + +} // Systems + diff --git a/solanaceae/ngc_ft1_sha1/sending_transfers.cpp b/solanaceae/ngc_ft1_sha1/sending_transfers.cpp index 63a3515..d376dd7 100644 --- a/solanaceae/ngc_ft1_sha1/sending_transfers.cpp +++ b/solanaceae/ngc_ft1_sha1/sending_transfers.cpp @@ -37,7 +37,7 @@ SendingTransfers::Entry& SendingTransfers::emplaceInfo(uint32_t group_number, ui } SendingTransfers::Entry& SendingTransfers::emplaceChunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Chunk& chunk) { - assert(!containsPeerChunk(group_number, peer_number, chunk.content, chunk.chunk_index)); + assert(!containsPeerChunk(group_number, peer_number, chunk.o, chunk.chunk_index)); auto& ent = _data[combine_ids(group_number, peer_number)][transfer_id]; ent.v = chunk; return ent; @@ -60,7 +60,7 @@ bool SendingTransfers::containsChunk(ObjectHandle o, size_t chunk_idx) const { } const auto& c = v.getChunk(); - if (c.content != o) { + if (c.o != o) { continue; } @@ -85,7 +85,7 @@ bool SendingTransfers::containsPeerChunk(uint32_t group_number, uint32_t peer_nu } const auto& c = v.getChunk(); - if (c.content != o) { + if (c.o != o) { continue; } diff --git a/solanaceae/ngc_ft1_sha1/sending_transfers.hpp b/solanaceae/ngc_ft1_sha1/sending_transfers.hpp index bcf8645..78364b8 100644 --- a/solanaceae/ngc_ft1_sha1/sending_transfers.hpp +++ b/solanaceae/ngc_ft1_sha1/sending_transfers.hpp @@ -19,7 +19,7 @@ struct SendingTransfers { }; struct Chunk { - ObjectHandle content; + ObjectHandle o; size_t chunk_index; // <.< remove offset_into_file //uint64_t offset_into_file; // or data? diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index 4fc22cd..3958432 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -30,6 +30,7 @@ #include "./re_announce_systems.hpp" #include "./chunk_picker_systems.hpp" #include "./transfer_stats_systems.hpp" +#include "./file_inactivity_system.hpp" #include #include @@ -177,11 +178,13 @@ File2I* SHA1_NGCFT1::objGetFile2Write(ObjectHandle o) { std::cerr << "SHA1_NGCFT1 error: failed to open object for writing\n"; return nullptr; // early out } - file2_comp_ptr = &o.emplace_or_replace(std::move(new_file)); + file2_comp_ptr = &o.emplace_or_replace(std::move(new_file), getTimeNow()); } assert(file2_comp_ptr != nullptr); assert(static_cast(file2_comp_ptr->file)); + file2_comp_ptr->last_activity_ts = getTimeNow(); + return file2_comp_ptr->file.get(); } @@ -195,11 +198,13 @@ File2I* SHA1_NGCFT1::objGetFile2Read(ObjectHandle o) { std::cerr << "SHA1_NGCFT1 error: failed to open object for reading\n"; return nullptr; // early out } - file2_comp_ptr = &o.emplace_or_replace(std::move(new_file)); + file2_comp_ptr = &o.emplace_or_replace(std::move(new_file), getTimeNow()); } assert(file2_comp_ptr != nullptr); assert(static_cast(file2_comp_ptr->file)); + file2_comp_ptr->last_activity_ts = getTimeNow(); + return file2_comp_ptr->file.get(); } @@ -259,7 +264,7 @@ SHA1_NGCFT1::SHA1_NGCFT1( } float SHA1_NGCFT1::iterate(float delta) { - _mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast + _mfb.tick(getTimeNow()); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast _peer_open_requests.clear(); @@ -467,6 +472,12 @@ float SHA1_NGCFT1::iterate(float delta) { delta ); + _file_inactivity_timer += delta; + if (_file_inactivity_timer >= 21.554f) { + _file_inactivity_timer = 0.f; + Systems::file_inactivity(_os.registry(), getTimeNow()); + } + // transfer statistics systems Systems::transfer_tally_update(_os.registry(), getTimeNow()); @@ -742,7 +753,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) { } } - e.e.emplace_or_replace(std::move(file_impl)); + e.e.emplace_or_replace(std::move(file_impl), getTimeNow()); // queue announce that we are participating e.e.get_or_emplace(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f; @@ -1042,9 +1053,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { } } else if (transfer.isChunk()) { auto& chunk_transfer = transfer.getChunk(); - const auto& info = chunk_transfer.content.get(); + const auto& info = chunk_transfer.o.get(); - auto* file2 = objGetFile2Read(chunk_transfer.content); + auto* file2 = objGetFile2Read(chunk_transfer.o); if (file2 == nullptr) { // return true? return false; // early out @@ -1074,7 +1085,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { } } if (static_cast(c)) { - chunk_transfer.content.get_or_emplace() + chunk_transfer.o.get_or_emplace() .tally[c] .recently_sent .push_back( @@ -1195,11 +1206,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { o.emplace_or_replace(); std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; - - // HACK: close file2, to clear ram - // TODO: just add a lastActivity comp and close files every x minutes based on that - file2 = nullptr; // making sure we dont have a stale ptr - o.remove(); // will be recreated on demand break; } } @@ -1276,9 +1282,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) { if (transfer.isChunk()) { // we could cheat here and assume remote has chunk now - _os.throwEventUpdate(transfer.getChunk().content); + _os.throwEventUpdate(transfer.getChunk().o); - updateMessages(transfer.getChunk().content); // mostly for sent bytes + updateMessages(transfer.getChunk().o); // mostly for sent bytes } // ignore info transfer for now _sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index 9d90027..a024faa 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -88,6 +88,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public File2I* objGetFile2Write(ObjectHandle o); File2I* objGetFile2Read(ObjectHandle o); + float _file_inactivity_timer {0.f}; + public: // TODO: config bool _udp_only {false};