diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index c55579b..28b9248 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -1,5 +1,6 @@ #include "./sha1_ngcft1.hpp" +#include #include #include @@ -20,6 +21,7 @@ #include #include #include +#include namespace Message::Components { @@ -245,6 +247,18 @@ SHA1_NGCFT1::SHA1_NGCFT1( } void SHA1_NGCFT1::iterate(float delta) { + // info builder queue + if (_info_builder_dirty) { + std::lock_guard l{_info_builder_queue_mutex}; + _info_builder_dirty = false; // set while holding lock + + for (auto& it : _info_builder_queue) { + //it.fn(); + it(); + } + _info_builder_queue.clear(); + } + { // timers // sending transfers for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) { @@ -556,7 +570,7 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } - if (cc.have_count == info.chunks.size()) { + if (cc.have_count >= info.chunks.size()) { cc.have_all = true; } } else { @@ -1095,216 +1109,259 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std return false; } - // TODO: rw? - // TODO: memory mapped would be king - auto file_impl = std::make_unique(file_path); - if (!file_impl->isGood()) { - std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path << "'!\n"; - return true; - } - // get current time unix epoch utc uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - // 1. build info by hashing all chunks + std::thread(std::move([ + // copy everything + self = this, + ts, + c, + reg_ptr, + file_name_ = std::string(file_name), + file_path_ = std::string(file_path) + ]() mutable { + // TODO: rw? + // TODO: memory mapped would be king + auto file_impl = std::make_unique(file_path_); + if (!file_impl->isGood()) { + { + std::lock_guard l{self->_info_builder_queue_mutex}; + self->_info_builder_queue.push_back([file_path_](){ + // back on iterate thread - FT1InfoSHA1 sha1_info; - // build info - sha1_info.file_name = file_name; - sha1_info.file_size = file_impl->_file_size; - - { // build chunks - // HACK: load file fully - // TODO: the speed is truly horrid - const auto file_data = file_impl->read(0, file_impl->_file_size); - size_t i = 0; - for (; i + sha1_info.chunk_size < file_data.size(); i += sha1_info.chunk_size) { - sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, sha1_info.chunk_size)); + std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; + }); + self->_info_builder_dirty = true; // still in scope, set before mutex unlock + } + return; } - if (i < file_data.size()) { - sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, file_data.size()-i)); - } - } + // 1. build info by hashing all chunks - // 2. hash info - std::vector sha1_info_data; - std::vector sha1_info_hash; + FT1InfoSHA1 sha1_info; + // build info + sha1_info.file_name = file_name_; + sha1_info.file_size = file_impl->_file_size; - std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info; - sha1_info_data = sha1_info.toBuffer(); - std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n"; - sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size()); - std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n"; + { // build chunks + // HACK: load file fully + // TODO: the speed is truly horrid + const auto file_data = file_impl->read(0, file_impl->_file_size); + size_t i = 0; + for (; i + sha1_info.chunk_size < file_data.size(); i += sha1_info.chunk_size) { + sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, sha1_info.chunk_size)); + } - // check if content exists - ContentHandle ce; - if (_info_to_content.count(sha1_info_hash)) { - ce = _info_to_content.at(sha1_info_hash); - - // TODO: check if content is incomplete and use file instead - if (!ce.all_of()) { - ce.emplace(sha1_info); - } - if (!ce.all_of()) { - ce.emplace(sha1_info_data); - } - - // hash has to be set already - // Components::FT1InfoSHA1Hash - - { // lookup tables and have - auto& cc = ce.get_or_emplace(); - cc.have_all = true; - // skip have vec, since all - //cc.have_chunk - cc.have_count = sha1_info.chunks.size(); // need? - - _info_to_content[sha1_info_hash] = ce; - cc.chunk_hash_to_index.clear(); // for cpy pst - for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - _chunks[sha1_info.chunks[i]] = ce; - cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); + if (i < file_data.size()) { + sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, file_data.size()-i)); } } - { // file info - // TODO: not overwrite fi? since same? - auto& file_info = ce.emplace_or_replace(); - file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; - file_info.total_size = file_impl->_file_size; + file_impl.reset(); - ce.emplace_or_replace(std::vector{std::string{file_path}}); + { + std::lock_guard l{self->_info_builder_queue_mutex}; + self->_info_builder_queue.push_back(std::move([ + self, + ts, + c, + reg_ptr, + file_name_, + file_path_, + sha1_info = std::move(sha1_info) + ]() mutable { // + // back on iterate thread + + auto file_impl = std::make_unique(file_path_); + if (!file_impl->isGood()) { + std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; + return; + } + + // 2. hash info + std::vector sha1_info_data; + std::vector sha1_info_hash; + + std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info; + sha1_info_data = sha1_info.toBuffer(); + std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n"; + sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size()); + std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n"; + + // check if content exists + ContentHandle ce; + if (self->_info_to_content.count(sha1_info_hash)) { + ce = self->_info_to_content.at(sha1_info_hash); + + // TODO: check if content is incomplete and use file instead + if (!ce.all_of()) { + ce.emplace(sha1_info); + } + if (!ce.all_of()) { + ce.emplace(sha1_info_data); + } + + // hash has to be set already + // Components::FT1InfoSHA1Hash + + { // lookup tables and have + auto& cc = ce.get_or_emplace(); + cc.have_all = true; + // skip have vec, since all + //cc.have_chunk + cc.have_count = sha1_info.chunks.size(); // need? + + self->_info_to_content[sha1_info_hash] = ce; + cc.chunk_hash_to_index.clear(); // for cpy pst + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + self->_chunks[sha1_info.chunks[i]] = ce; + cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); + } + } + + { // file info + // TODO: not overwrite fi? since same? + auto& file_info = ce.emplace_or_replace(); + file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; + file_info.total_size = file_impl->_file_size; + + ce.emplace_or_replace(std::vector{std::string{file_path_}}); + } + + // cleanup file + if (ce.all_of()) { + // replace + ce.remove(); + } + ce.emplace(std::move(file_impl)); + + if (!ce.all_of()) { + ce.emplace(0u); + } + + ce.remove(); + + // we dont want the info anymore + ce.remove(); + if (auto it = std::find(self->_queue_content_want_info.begin(), self->_queue_content_want_info.end(), ce); it != self->_queue_content_want_info.end()) { + self->_queue_content_want_info.erase(it); + } + + // TODO: we dont want chunks anymore + + // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) + auto it = self->_queue_content_want_chunk.begin(); + while ( + it != self->_queue_content_want_chunk.end() && + (it = std::find(it, self->_queue_content_want_chunk.end(), ce)) != self->_queue_content_want_chunk.end() + ) { + it = self->_queue_content_want_chunk.erase(it); + } + } else { + ce = {self->_contentr, self->_contentr.create()}; + self->_info_to_content[sha1_info_hash] = ce; + + ce.emplace(sha1_info); + ce.emplace(sha1_info_data); // keep around? or file? + ce.emplace(sha1_info_hash); + { // lookup tables and have + auto& cc = ce.emplace(); + cc.have_all = true; + // skip have vec, since all + //cc.have_chunk + cc.have_count = sha1_info.chunks.size(); // need? + + self->_info_to_content[sha1_info_hash] = ce; + cc.chunk_hash_to_index.clear(); // for cpy pst + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + self->_chunks[sha1_info.chunks[i]] = ce; + cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); + } + } + + { // file info + auto& file_info = ce.emplace(); + //const auto& file = ce.get(); + file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; + file_info.total_size = file_impl->_file_size; + + ce.emplace(std::vector{std::string{file_path_}}); + } + + ce.emplace(std::move(file_impl)); + + ce.emplace(0u); + } + + const auto c_self = self->_cr.get(c).self; + if (!self->_cr.valid(c_self)) { + std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; + return; + } + + const auto msg_e = reg_ptr->create(); + reg_ptr->emplace(msg_e, c); + reg_ptr->emplace(msg_e, c_self); + reg_ptr->emplace(msg_e, ts); // reactive? + + reg_ptr->emplace(msg_e); + reg_ptr->emplace(msg_e); + + ce.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); + + + //reg_ptr->emplace(e, file_kind); + // file id would be sha1_info hash or something + //reg_ptr->emplace(e, file_id); + + // remove? done in updateMessages() anyway + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); + } + + // TODO: determine if this is true + //reg_ptr->emplace(e); + + if (self->_cr.any_of(c)) { + const uint32_t group_number = self->_cr.get(c).group_number; + uint32_t message_id = 0; + + // TODO: check return + self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); + reg_ptr->emplace(msg_e, message_id); + + // TODO: generalize? + auto& synced_by = reg_ptr->emplace(msg_e).list; + synced_by.emplace(c_self); + } else if ( + // non online group + self->_cr.any_of(c) + ) { + // create msg_id + const uint32_t message_id = randombytes_random(); + reg_ptr->emplace(msg_e, message_id); + + // TODO: generalize? + auto& synced_by = reg_ptr->emplace(msg_e).list; + synced_by.emplace(c_self); + } + + self->_rmm.throwEventConstruct(*reg_ptr, msg_e); + + // TODO: place in iterate? + self->updateMessages(ce); + + })); + self->_info_builder_dirty = true; // still in scope, set before mutex unlock } - - // cleanup file - if (ce.all_of()) { - // replace - ce.remove(); - } - ce.emplace(std::move(file_impl)); - - if (!ce.all_of()) { - ce.emplace(0u); - } - - ce.remove(); - - // we dont want the info anymore - ce.remove(); - if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) { - _queue_content_want_info.erase(it); - } - - // TODO: we dont want chunks anymore - - // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) - auto it = _queue_content_want_chunk.begin(); - while ( - it != _queue_content_want_chunk.end() && - (it = std::find(it, _queue_content_want_chunk.end(), ce)) != _queue_content_want_chunk.end() - ) { - it = _queue_content_want_chunk.erase(it); - } - } else { - ce = {_contentr, _contentr.create()}; - _info_to_content[sha1_info_hash] = ce; - - ce.emplace(sha1_info); - ce.emplace(sha1_info_data); // keep around? or file? - ce.emplace(sha1_info_hash); - { // lookup tables and have - auto& cc = ce.emplace(); - cc.have_all = true; - // skip have vec, since all - //cc.have_chunk - cc.have_count = sha1_info.chunks.size(); // need? - - _info_to_content[sha1_info_hash] = ce; - cc.chunk_hash_to_index.clear(); // for cpy pst - for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - _chunks[sha1_info.chunks[i]] = ce; - cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); - } - } - - { // file info - auto& file_info = ce.emplace(); - //const auto& file = ce.get(); - file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; - file_info.total_size = file_impl->_file_size; - - ce.emplace(std::vector{std::string{file_path}}); - } - - ce.emplace(std::move(file_impl)); - - ce.emplace(0u); - } - - const auto c_self = _cr.get(c).self; - if (!_cr.valid(c_self)) { - std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; - return true; - } - - const auto msg_e = reg_ptr->create(); - reg_ptr->emplace(msg_e, c); - reg_ptr->emplace(msg_e, c_self); - reg_ptr->emplace(msg_e, ts); // reactive? - - reg_ptr->emplace(msg_e); - reg_ptr->emplace(msg_e); - - ce.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); - - - //reg_ptr->emplace(e, file_kind); - // file id would be sha1_info hash or something - //reg_ptr->emplace(e, file_id); - - // remove? done in updateMessages() anyway - if (ce.all_of()) { - reg_ptr->emplace(msg_e, ce.get()); - } - if (ce.all_of()) { - reg_ptr->emplace(msg_e, ce.get()); - } - if (ce.all_of()) { - reg_ptr->emplace(msg_e, ce.get()); - } - - // TODO: determine if this is true - //reg_ptr->emplace(e); - - if (_cr.any_of(c)) { - const uint32_t group_number = _cr.get(c).group_number; - uint32_t message_id = 0; - - // TODO: check return - _nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); - reg_ptr->emplace(msg_e, message_id); - - // TODO: generalize? - auto& synced_by = reg_ptr->emplace(msg_e).list; - synced_by.emplace(c_self); - } else if ( - // non online group - _cr.any_of(c) - ) { - // create msg_id - const uint32_t message_id = randombytes_random(); - reg_ptr->emplace(msg_e, message_id); - - // TODO: generalize? - auto& synced_by = reg_ptr->emplace(msg_e).list; - synced_by.emplace(c_self); - } - - _rmm.throwEventConstruct(*reg_ptr, msg_e); - - // TODO: place in iterate? - updateMessages(ce); + })).detach(); return true; } diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index 8428ef9..f23d42e 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -16,6 +16,9 @@ #include #include +#include +#include +#include enum class Content : uint32_t {}; using ContentRegistry = entt::basic_registry; @@ -93,6 +96,16 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { std::deque _queue_content_want_info; std::deque _queue_content_want_chunk; + std::atomic_bool _info_builder_dirty {false}; + std::mutex _info_builder_queue_mutex; + //struct InfoBuilderEntry { + //// called on completion on the iterate thread + //// (owning) + //std::function fn; + //}; + using InfoBuilderEntry = std::function; + std::list _info_builder_queue; + static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number); void updateMessages(ContentHandle ce); @@ -105,7 +118,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { size_t _max_concurrent_in {4}; size_t _max_concurrent_out {6}; // TODO: probably also includes running transfers rn (meh) - size_t _max_pending_requests {16}; // per content + size_t _max_pending_requests {32}; // per content public: SHA1_NGCFT1(