From 54ace9d0b2e98d3b3ff70bd9bc7821b1d150dfdc Mon Sep 17 00:00:00 2001 From: Green Sky Date: Wed, 17 Jul 2024 17:17:07 +0200 Subject: [PATCH] and use new backend code (partially transitioned to os backend) --- solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 351 +++++++----------------- solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp | 15 +- 2 files changed, 99 insertions(+), 267 deletions(-) diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index 1a170b6..21c8230 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -29,7 +29,6 @@ #include #include -#include #include namespace Message::Components { @@ -193,7 +192,8 @@ SHA1_NGCFT1::SHA1_NGCFT1( _nft(nft), _tcm(tcm), _tep(tep), - _neep(neep) + _neep(neep), + _mfb(os) { // TODO: also create and destroy _rmm.subscribe(this, RegistryMessageModel_Event::message_updated); @@ -221,17 +221,7 @@ SHA1_NGCFT1::SHA1_NGCFT1( float SHA1_NGCFT1::iterate(float delta) { //std::cerr << "---------- new tick ----------\n"; - // info builder queue - if (_info_builder_dirty) { - std::lock_guard l{_info_builder_queue_mutex}; - _info_builder_dirty = false; // set while holding lock - - for (auto& it : _info_builder_queue) { - //it.fn(); - it(); - } - _info_builder_queue.clear(); - } + _mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast entt::dense_map peer_open_requests; @@ -440,6 +430,89 @@ float SHA1_NGCFT1::iterate(float delta) { } } +// gets called back on main thread after a "new" file info got built on a different thread +void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts) { + // sanity + if (!o.all_of()) { + assert(false); + return; + } + + // update content lookup + const auto& info_hash = o.get().hash; + _info_to_content[info_hash] = o; + + // update chunk lookup + const auto& cc = o.get(); + const auto& info = o.get(); + for (size_t i = 0; i < info.chunks.size(); i++) { + _chunks[info.chunks[i]] = o; + } + + // remove from info request queue + if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) { + _queue_content_want_info.erase(it); + } + + // TODO: we dont want chunks anymore + // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) + + // something happend, update all chunk pickers + if (o.all_of()) { + for (const auto& pcv : o.get().participants) { + Contact3Handle pch{_cr, pcv}; + assert(static_cast(pch)); + pch.emplace_or_replace(); + } + } + + // create message + const auto c_self = _cr.get(c).self; + if (!_cr.valid(c_self)) { + std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; + return; + } + + const auto msg_e = reg_ptr->create(); + reg_ptr->emplace(msg_e, c); + reg_ptr->emplace(msg_e, c_self); + reg_ptr->emplace(msg_e, ts); // reactive? + reg_ptr->emplace(msg_e, ts); + + reg_ptr->emplace(msg_e); + reg_ptr->emplace(msg_e); + + o.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); + + //reg_ptr->emplace(e, file_kind); + // file id would be sha1_info hash or something + //reg_ptr->emplace(e, file_id); + + if (_cr.any_of(c)) { + const uint32_t group_number = _cr.get(c).group_number; + uint32_t message_id = 0; + + // TODO: check return + _nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size()); + reg_ptr->emplace(msg_e, message_id); + } else if ( + // non online group + _cr.any_of(c) + ) { + // create msg_id + const uint32_t message_id = randombytes_random(); + reg_ptr->emplace(msg_e, message_id); + } // TODO: else private message + + reg_ptr->get_or_emplace(msg_e).ts.try_emplace(c_self, ts); + reg_ptr->get_or_emplace(msg_e).ts.try_emplace(c_self, ts); + + _rmm.throwEventConstruct(*reg_ptr, msg_e); + + // TODO: place in iterate? + updateMessages(o); +} + bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { // see tox_transfer_manager.cpp for reference if (!e.e.all_of()) { @@ -990,6 +1063,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { ); } +#if 0 if (!cc.have_all) { // debug print self have set std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits(); for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) { @@ -1001,6 +1075,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { } printf("\n"); } +#endif } else { std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; } @@ -1176,252 +1251,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std // get current time unix epoch utc uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - std::thread(std::move([ - // copy everything - self = this, - ts, - c, - reg_ptr, - file_name_ = std::string(file_name), - file_path_ = std::string(file_path) - ]() mutable { - std::unique_ptr file_impl = construct_file2_rw_mapped(file_path_, -1); - if (!file_impl->isGood()) { - { - std::lock_guard l{self->_info_builder_queue_mutex}; - self->_info_builder_queue.push_back([file_path_](){ - // back on iterate thread - - std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; - }); - self->_info_builder_dirty = true; // still in scope, set before mutex unlock - } - return; - } - - // 1. build info by hashing all chunks - - FT1InfoSHA1 sha1_info; - // build info - sha1_info.file_name = file_name_; - sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details - - { // build chunks - // HACK: load file fully - // ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway - const auto file_data = file_impl->read(file_impl->_file_size, 0); - size_t i = 0; - for (; i + sha1_info.chunk_size < file_data.size; i += sha1_info.chunk_size) { - sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, sha1_info.chunk_size)); - } - - if (i < file_data.size) { - sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, file_data.size-i)); - } - } - - file_impl.reset(); - - { - std::lock_guard l{self->_info_builder_queue_mutex}; - self->_info_builder_queue.push_back(std::move([ - self, - ts, - c, - reg_ptr, - file_name_, - file_path_, - sha1_info = std::move(sha1_info) - ]() mutable { // - // back on iterate thread - - std::unique_ptr file_impl = construct_file2_rw_mapped(file_path_, sha1_info.file_size); - if (!file_impl->isGood()) { - std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; - return; - } - - // 2. hash info - std::vector sha1_info_data; - std::vector sha1_info_hash; - - std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info; - sha1_info_data = sha1_info.toBuffer(); - std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n"; - sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size()); - std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n"; - - // check if content exists - ObjectHandle ce; - if (self->_info_to_content.count(sha1_info_hash)) { - ce = self->_info_to_content.at(sha1_info_hash); - - // TODO: check if content is incomplete and use file instead - if (!ce.all_of()) { - ce.emplace(sha1_info); - } - if (!ce.all_of()) { - ce.emplace(sha1_info_data); - } - - // hash has to be set already - // Components::FT1InfoSHA1Hash - - { // lookup tables and have - auto& cc = ce.get_or_emplace(); - cc.have_all = true; - // skip have vec, since all - //cc.have_chunk - cc.have_count = sha1_info.chunks.size(); // need? - - self->_info_to_content[sha1_info_hash] = ce; - cc.chunk_hash_to_index.clear(); // for cpy pst - for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - self->_chunks[sha1_info.chunks[i]] = ce; - cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); - } - } - - { // file info - // TODO: not overwrite fi? since same? - auto& file_info = ce.emplace_or_replace(); - file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; - file_info.total_size = file_impl->_file_size; - - ce.emplace_or_replace(std::vector{std::string{file_path_}}); - } - - // cleanup file - if (ce.all_of()) { - // replace - ce.remove(); - } - ce.emplace(std::move(file_impl)); - - if (!ce.all_of()) { - ce.emplace(0u); - } - - ce.remove(); - - // we dont want the info anymore - ce.remove(); - if (auto it = std::find(self->_queue_content_want_info.begin(), self->_queue_content_want_info.end(), ce); it != self->_queue_content_want_info.end()) { - self->_queue_content_want_info.erase(it); - } - - // TODO: we dont want chunks anymore - // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) - } else { - // TODO: backend - ce = {self->_os.registry(), self->_os.registry().create()}; - self->_info_to_content[sha1_info_hash] = ce; - - ce.emplace(sha1_info); - ce.emplace(sha1_info_data); // keep around? or file? - ce.emplace(sha1_info_hash); - { // lookup tables and have - auto& cc = ce.emplace(); - cc.have_all = true; - // skip have vec, since all - //cc.have_chunk - cc.have_count = sha1_info.chunks.size(); // need? - - self->_info_to_content[sha1_info_hash] = ce; - cc.chunk_hash_to_index.clear(); // for cpy pst - for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - self->_chunks[sha1_info.chunks[i]] = ce; - cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); - } - } - - { // file info - auto& file_info = ce.emplace(); - //const auto& file = ce.get(); - file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; - file_info.total_size = file_impl->_file_size; - - ce.emplace(std::vector{std::string{file_path_}}); - } - - ce.emplace(std::move(file_impl)); - - ce.emplace(0u); - } - - // something happend, update all chunk pickers - if (ce.all_of()) { - for (const auto& pcv : ce.get().participants) { - Contact3Handle pch{self->_cr, pcv}; - assert(static_cast(pch)); - pch.emplace_or_replace(); - } - } - - const auto c_self = self->_cr.get(c).self; - if (!self->_cr.valid(c_self)) { - std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; - return; - } - - const auto msg_e = reg_ptr->create(); - reg_ptr->emplace(msg_e, c); - reg_ptr->emplace(msg_e, c_self); - reg_ptr->emplace(msg_e, ts); // reactive? - reg_ptr->emplace(msg_e, ts); - - reg_ptr->emplace(msg_e); - reg_ptr->emplace(msg_e); - - ce.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); - - - //reg_ptr->emplace(e, file_kind); - // file id would be sha1_info hash or something - //reg_ptr->emplace(e, file_id); - - // remove? done in updateMessages() anyway - if (ce.all_of()) { - reg_ptr->emplace(msg_e, ce.get()); - } - if (ce.all_of()) { - reg_ptr->emplace(msg_e, ce.get()); - } - if (ce.all_of()) { - reg_ptr->emplace(msg_e, ce.get()); - } - - // TODO: determine if this is true - //reg_ptr->emplace(e); - - if (self->_cr.any_of(c)) { - const uint32_t group_number = self->_cr.get(c).group_number; - uint32_t message_id = 0; - - // TODO: check return - self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); - reg_ptr->emplace(msg_e, message_id); - } else if ( - // non online group - self->_cr.any_of(c) - ) { - // create msg_id - const uint32_t message_id = randombytes_random(); - reg_ptr->emplace(msg_e, message_id); - } - - reg_ptr->get_or_emplace(msg_e).ts.try_emplace(c_self, ts); - reg_ptr->get_or_emplace(msg_e).ts.try_emplace(c_self, ts); - - self->_rmm.throwEventConstruct(*reg_ptr, msg_e); - - // TODO: place in iterate? - self->updateMessages(ce); - - })); - self->_info_builder_dirty = true; // still in scope, set before mutex unlock - } - })).detach(); + _mfb.newFromFile( + file_name, file_path, + [this, reg_ptr, c, ts](ObjectHandle o) { onSendFileHashFinished(o, reg_ptr, c, ts); } + ); return true; } diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index e19169d..99d9bf1 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -13,12 +13,12 @@ #include "./sending_transfers.hpp" #include "./receiving_transfers.hpp" +#include "./backends/sha1_mapped_filesystem.hpp" + #include #include -#include -#include -#include +#include class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public NGCFT1EventI, public NGCEXTEventI { ObjectStore2& _os; @@ -30,6 +30,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public ToxEventProviderI& _tep; NGCEXTEventProvider& _neep; + Backends::SHA1MappedFilesystem _mfb; + std::minstd_rand _rng {1337*11}; using clock = std::chrono::steady_clock; @@ -67,11 +69,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public // only used to remove participation on peer exit entt::dense_map _tox_peer_to_contact; - std::atomic_bool _info_builder_dirty {false}; - std::mutex _info_builder_queue_mutex; - using InfoBuilderEntry = std::function; - std::list _info_builder_queue; - void updateMessages(ObjectHandle ce); std::optional> selectPeerForRequest(ObjectHandle ce); @@ -97,6 +94,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public float iterate(float delta); + void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts); + protected: // rmm events (actions) bool onEvent(const Message::Events::MessageUpdated&) override;