close files of inactive tranfsers

This commit is contained in:
Green Sky 2025-03-14 13:50:30 +01:00
parent 246587e30d
commit f5f7f2ca9d
No known key found for this signature in database
GPG Key ID: DBE05085D874AB4A
10 changed files with 85 additions and 25 deletions

View File

@ -84,6 +84,9 @@ add_library(solanaceae_sha1_ngcft1
./solanaceae/ngc_ft1_sha1/participation.hpp ./solanaceae/ngc_ft1_sha1/participation.hpp
./solanaceae/ngc_ft1_sha1/participation.cpp ./solanaceae/ngc_ft1_sha1/participation.cpp
./solanaceae/ngc_ft1_sha1/file_inactivity_system.hpp
./solanaceae/ngc_ft1_sha1/file_inactivity_system.cpp
./solanaceae/ngc_ft1_sha1/re_announce_systems.hpp ./solanaceae/ngc_ft1_sha1/re_announce_systems.hpp
./solanaceae/ngc_ft1_sha1/re_announce_systems.cpp ./solanaceae/ngc_ft1_sha1/re_announce_systems.cpp

View File

@ -22,7 +22,7 @@ namespace Backends {
struct SHA1MappedFilesystem_InfoBuilderState { struct SHA1MappedFilesystem_InfoBuilderState {
std::atomic_bool info_builder_dirty {false}; std::atomic_bool info_builder_dirty {false};
std::mutex info_builder_queue_mutex; std::mutex info_builder_queue_mutex;
using InfoBuilderEntry = std::function<void(void)>; using InfoBuilderEntry = std::function<void(float)>;
std::list<InfoBuilderEntry> info_builder_queue; std::list<InfoBuilderEntry> info_builder_queue;
}; };
@ -34,13 +34,13 @@ SHA1MappedFilesystem::SHA1MappedFilesystem(
SHA1MappedFilesystem::~SHA1MappedFilesystem(void) { SHA1MappedFilesystem::~SHA1MappedFilesystem(void) {
} }
void SHA1MappedFilesystem::tick(void) { void SHA1MappedFilesystem::tick(float current_time) {
if (_ibs->info_builder_dirty) { if (_ibs->info_builder_dirty) {
std::lock_guard l{_ibs->info_builder_queue_mutex}; std::lock_guard l{_ibs->info_builder_queue_mutex};
_ibs->info_builder_dirty = false; // set while holding lock _ibs->info_builder_dirty = false; // set while holding lock
for (auto& it : _ibs->info_builder_queue) { for (auto& it : _ibs->info_builder_queue) {
it(); it(current_time);
} }
_ibs->info_builder_queue.clear(); _ibs->info_builder_queue.clear();
} }
@ -72,7 +72,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
if (!file_impl->isGood()) { if (!file_impl->isGood()) {
{ {
std::lock_guard l{ibs->info_builder_queue_mutex}; std::lock_guard l{ibs->info_builder_queue_mutex};
ibs->info_builder_queue.push_back([file_path_](){ ibs->info_builder_queue.push_back([file_path_](float){
// back on iterate thread // back on iterate thread
std::cerr << "SHA1MF error: failed opening file '" << file_path_ << "'!\n"; std::cerr << "SHA1MF error: failed opening file '" << file_path_ << "'!\n";
@ -120,7 +120,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
file_path_, file_path_,
sha1_info = std::move(sha1_info), sha1_info = std::move(sha1_info),
cb = std::move(cb) cb = std::move(cb)
]() mutable { // ](float current_time) mutable { //
// executed on iterate thread // executed on iterate thread
// reopen, cant move, since std::function needs to be copy consturctable (meh) // reopen, cant move, since std::function needs to be copy consturctable (meh)
@ -196,7 +196,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
o.emplace_or_replace<ObjComp::Ephemeral::FilePath>(file_path_); // ? o.emplace_or_replace<ObjComp::Ephemeral::FilePath>(file_path_); // ?
} }
o.emplace_or_replace<Components::FT1File2>(std::move(file_impl)); o.emplace_or_replace<Components::FT1File2>(std::move(file_impl), current_time);
if (!o.all_of<ObjComp::Ephemeral::File::TransferStats>()) { if (!o.all_of<ObjComp::Ephemeral::File::TransferStats>()) {
o.emplace<ObjComp::Ephemeral::File::TransferStats>(); o.emplace<ObjComp::Ephemeral::File::TransferStats>();

View File

@ -22,7 +22,7 @@ struct SHA1MappedFilesystem : public StorageBackendIMeta, public StorageBackendI
// pull from info builder queue // pull from info builder queue
// call from main thread (os thread?) often // call from main thread (os thread?) often
void tick(void); void tick(float current_time);
ObjectHandle newObject(ByteSpan id, bool throw_construct = true) override; ObjectHandle newObject(ByteSpan id, bool throw_construct = true) override;

View File

@ -54,6 +54,9 @@ namespace Components {
// the cached file2 for faster access // the cached file2 for faster access
// should be destroyed when no activity and recreated on demand // should be destroyed when no activity and recreated on demand
std::unique_ptr<File2I> file; std::unique_ptr<File2I> file;
// set to current time on init, read, write
float last_activity_ts {0.f};
}; };
struct FT1ChunkSHA1Requested { struct FT1ChunkSHA1Requested {

View File

@ -0,0 +1,33 @@
#include "./file_inactivity_system.hpp"
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/file/file2.hpp>
#include "./components.hpp"
#include <iostream>
namespace Systems {
void file_inactivity(
ObjectRegistry& os_reg,
float current_time
) {
std::vector<Object> to_close;
size_t total {0};
os_reg.view<Components::FT1File2>().each([&os_reg, &to_close, &total, current_time](Object ov, const Components::FT1File2& ft_f) {
if (current_time - ft_f.last_activity_ts >= 30.f) {
// after 30sec of inactivity
to_close.push_back(ov);
}
total++;
});
if (!to_close.empty()) {
std::cout << "SHA1_NGCFT1: closing " << to_close.size() << " out of " << total << " open files\n";
os_reg.remove<Components::FT1File2>(to_close.cbegin(), to_close.cend());
}
}
} // Systems

View File

@ -0,0 +1,13 @@
#pragma once
#include <solanaceae/object_store/fwd.hpp>
namespace Systems {
void file_inactivity(
ObjectRegistry& os_reg,
float current_time
);
} // Systems

View File

@ -37,7 +37,7 @@ SendingTransfers::Entry& SendingTransfers::emplaceInfo(uint32_t group_number, ui
} }
SendingTransfers::Entry& SendingTransfers::emplaceChunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Chunk& chunk) { SendingTransfers::Entry& SendingTransfers::emplaceChunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const Entry::Chunk& chunk) {
assert(!containsPeerChunk(group_number, peer_number, chunk.content, chunk.chunk_index)); assert(!containsPeerChunk(group_number, peer_number, chunk.o, chunk.chunk_index));
auto& ent = _data[combine_ids(group_number, peer_number)][transfer_id]; auto& ent = _data[combine_ids(group_number, peer_number)][transfer_id];
ent.v = chunk; ent.v = chunk;
return ent; return ent;
@ -60,7 +60,7 @@ bool SendingTransfers::containsChunk(ObjectHandle o, size_t chunk_idx) const {
} }
const auto& c = v.getChunk(); const auto& c = v.getChunk();
if (c.content != o) { if (c.o != o) {
continue; continue;
} }
@ -85,7 +85,7 @@ bool SendingTransfers::containsPeerChunk(uint32_t group_number, uint32_t peer_nu
} }
const auto& c = v.getChunk(); const auto& c = v.getChunk();
if (c.content != o) { if (c.o != o) {
continue; continue;
} }

View File

@ -19,7 +19,7 @@ struct SendingTransfers {
}; };
struct Chunk { struct Chunk {
ObjectHandle content; ObjectHandle o;
size_t chunk_index; // <.< remove offset_into_file size_t chunk_index; // <.< remove offset_into_file
//uint64_t offset_into_file; //uint64_t offset_into_file;
// or data? // or data?

View File

@ -30,6 +30,7 @@
#include "./re_announce_systems.hpp" #include "./re_announce_systems.hpp"
#include "./chunk_picker_systems.hpp" #include "./chunk_picker_systems.hpp"
#include "./transfer_stats_systems.hpp" #include "./transfer_stats_systems.hpp"
#include "./file_inactivity_system.hpp"
#include <iostream> #include <iostream>
#include <filesystem> #include <filesystem>
@ -177,11 +178,13 @@ File2I* SHA1_NGCFT1::objGetFile2Write(ObjectHandle o) {
std::cerr << "SHA1_NGCFT1 error: failed to open object for writing\n"; std::cerr << "SHA1_NGCFT1 error: failed to open object for writing\n";
return nullptr; // early out return nullptr; // early out
} }
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file)); file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file), getTimeNow());
} }
assert(file2_comp_ptr != nullptr); assert(file2_comp_ptr != nullptr);
assert(static_cast<bool>(file2_comp_ptr->file)); assert(static_cast<bool>(file2_comp_ptr->file));
file2_comp_ptr->last_activity_ts = getTimeNow();
return file2_comp_ptr->file.get(); return file2_comp_ptr->file.get();
} }
@ -195,11 +198,13 @@ File2I* SHA1_NGCFT1::objGetFile2Read(ObjectHandle o) {
std::cerr << "SHA1_NGCFT1 error: failed to open object for reading\n"; std::cerr << "SHA1_NGCFT1 error: failed to open object for reading\n";
return nullptr; // early out return nullptr; // early out
} }
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file)); file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file), getTimeNow());
} }
assert(file2_comp_ptr != nullptr); assert(file2_comp_ptr != nullptr);
assert(static_cast<bool>(file2_comp_ptr->file)); assert(static_cast<bool>(file2_comp_ptr->file));
file2_comp_ptr->last_activity_ts = getTimeNow();
return file2_comp_ptr->file.get(); return file2_comp_ptr->file.get();
} }
@ -259,7 +264,7 @@ SHA1_NGCFT1::SHA1_NGCFT1(
} }
float SHA1_NGCFT1::iterate(float delta) { float SHA1_NGCFT1::iterate(float delta) {
_mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast _mfb.tick(getTimeNow()); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
_peer_open_requests.clear(); _peer_open_requests.clear();
@ -467,6 +472,12 @@ float SHA1_NGCFT1::iterate(float delta) {
delta delta
); );
_file_inactivity_timer += delta;
if (_file_inactivity_timer >= 21.554f) {
_file_inactivity_timer = 0.f;
Systems::file_inactivity(_os.registry(), getTimeNow());
}
// transfer statistics systems // transfer statistics systems
Systems::transfer_tally_update(_os.registry(), getTimeNow()); Systems::transfer_tally_update(_os.registry(), getTimeNow());
@ -742,7 +753,7 @@ bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
} }
} }
e.e.emplace_or_replace<Components::FT1File2>(std::move(file_impl)); e.e.emplace_or_replace<Components::FT1File2>(std::move(file_impl), getTimeNow());
// queue announce that we are participating // queue announce that we are participating
e.e.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f; e.e.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
@ -1042,9 +1053,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
} }
} else if (transfer.isChunk()) { } else if (transfer.isChunk()) {
auto& chunk_transfer = transfer.getChunk(); auto& chunk_transfer = transfer.getChunk();
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>(); const auto& info = chunk_transfer.o.get<Components::FT1InfoSHA1>();
auto* file2 = objGetFile2Read(chunk_transfer.content); auto* file2 = objGetFile2Read(chunk_transfer.o);
if (file2 == nullptr) { if (file2 == nullptr) {
// return true? // return true?
return false; // early out return false; // early out
@ -1074,7 +1085,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
} }
} }
if (static_cast<bool>(c)) { if (static_cast<bool>(c)) {
chunk_transfer.content.get_or_emplace<Components::TransferStatsTally>() chunk_transfer.o.get_or_emplace<Components::TransferStatsTally>()
.tally[c] .tally[c]
.recently_sent .recently_sent
.push_back( .push_back(
@ -1195,11 +1206,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
o.emplace_or_replace<ObjComp::F::TagLocalHaveAll>(); o.emplace_or_replace<ObjComp::F::TagLocalHaveAll>();
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
// HACK: close file2, to clear ram
// TODO: just add a lastActivity comp and close files every x minutes based on that
file2 = nullptr; // making sure we dont have a stale ptr
o.remove<Components::FT1File2>(); // will be recreated on demand
break; break;
} }
} }
@ -1276,9 +1282,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
if (transfer.isChunk()) { if (transfer.isChunk()) {
// we could cheat here and assume remote has chunk now // we could cheat here and assume remote has chunk now
_os.throwEventUpdate(transfer.getChunk().content); _os.throwEventUpdate(transfer.getChunk().o);
updateMessages(transfer.getChunk().content); // mostly for sent bytes updateMessages(transfer.getChunk().o); // mostly for sent bytes
} // ignore info transfer for now } // ignore info transfer for now
_sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); _sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id);

View File

@ -88,6 +88,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
File2I* objGetFile2Write(ObjectHandle o); File2I* objGetFile2Write(ObjectHandle o);
File2I* objGetFile2Read(ObjectHandle o); File2I* objGetFile2Read(ObjectHandle o);
float _file_inactivity_timer {0.f};
public: // TODO: config public: // TODO: config
bool _udp_only {false}; bool _udp_only {false};