diff --git a/src/sha1_ngcft1.cpp b/src/sha1_ngcft1.cpp index 1b4df3e..3288b63 100644 --- a/src/sha1_ngcft1.cpp +++ b/src/sha1_ngcft1.cpp @@ -17,8 +17,19 @@ #include #include +namespace Message::Components { + + using Content = ContentHandle; + +} // Message::Components + +// TODO: rename to content components namespace Components { + struct Messages { + std::vector messages; + }; + using FT1InfoSHA1 = FT1InfoSHA1; struct FT1InfoSHA1Data { @@ -73,7 +84,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { } } -void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, Message3Handle msg, const SHA1Digest& hash) { +void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash) { // TODO: transfers for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) { // if already in queue @@ -85,13 +96,34 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe } // not in queue yet - _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, msg, hash, 0.f)); + _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f)); } uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t peer_number) { return (uint64_t(group_number) << 32) | peer_number; } +void SHA1_NGCFT1::updateMessages(ContentHandle ce) { + assert(ce.all_of()); + + for (auto msg : ce.get().messages) { + if (ce.all_of() && !msg.all_of()) { + msg.emplace(ce.get()); + } + if (ce.all_of()) { + msg.emplace_or_replace(ce.get()); + } + if (ce.all_of()) { + msg.emplace_or_replace(ce.get()); + } + if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { + msg.emplace_or_replace(); + } + + _rmm.throwEventUpdate(msg); + } +} + SHA1_NGCFT1::SHA1_NGCFT1( Contact3Registry& cr, RegistryMessageModel& rmm, @@ -245,14 +277,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { } SHA1Digest info_hash{e.file_id, e.file_id_size}; - if (!_info_to_message.count(info_hash)) { + if (!_info_to_content.count(info_hash)) { // we dont know about this return false; } - auto msg = _info_to_message.at(info_hash); + auto content = _info_to_content.at(info_hash); - assert(msg.all_of()); + if (!content.all_of()) { + // we dont have the info for that infohash (yet?) + return false; + } // TODO: queue instead //queueUpRequestInfo(e.group_number, e.peer_number, info_hash); @@ -261,14 +296,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { e.group_number, e.peer_number, static_cast(e.file_kind), e.file_id, e.file_id_size, - msg.get().data.size(), + content.get().data.size(), &transfer_id ); _sending_transfers [combineIds(e.group_number, e.peer_number)] [transfer_id] - .v = SendingTransfer::Info{msg.get().data}; + .v = SendingTransfer::Info{content.get().data}; } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { if (e.file_id_size != 20) { // error @@ -337,14 +372,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { } } else if (std::holds_alternative(transfer.v)) { auto& chunk_transfer = std::get(transfer.v); - const auto data = chunk_transfer.msg.get()->read(chunk_transfer.offset_into_file + e.data_offset, e.data_size); + // TODO: should we really use file? + const auto data = chunk_transfer.content.get()->read(chunk_transfer.offset_into_file + e.data_offset, e.data_size); // TODO: optimize for (size_t i = 0; i < e.data_size && i < data.size(); i++) { e.data[i] = data[i]; } - chunk_transfer.msg.get_or_emplace().total += data.size(); + chunk_transfer.content.get_or_emplace().total += data.size(); + // TODO: add event to propergate to messages //_rmm.throwEventUpdate(transfer); // should we? //if (e.data_offset + e.data_size >= *insert chunk size here*) { @@ -411,11 +448,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { reg.emplace(new_msg_e); // add sending? - //reg_ptr->emplace(e, sha1_info); - //reg_ptr->emplace(e, sha1_info_data); // keep around? or file? - reg.emplace(new_msg_e, std::vector{e.file_id, e.file_id+e.file_id_size}); - // TODO: queue info dl - reg.emplace(new_msg_e, ts); //reg.emplace(new_msg_e, 0); reg.emplace(new_msg_e, ts); // reactive? @@ -425,6 +457,58 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { synced_by.emplace(self_c); } + // check if content exists + const auto sha1_info_hash = std::vector{e.file_id, e.file_id+e.file_id_size}; + ContentHandle ce; + if (_info_to_content.count(sha1_info_hash)) { + ce = _info_to_content.at(sha1_info_hash); + std::cout << "SHA1_NGCFT1: new message has existing content\n"; + } else { + ce = {_contentr, _contentr.create()}; + _info_to_content[sha1_info_hash] = ce; + std::cout << "SHA1_NGCFT1: new message has new content\n"; + + //ce.emplace(sha1_info); + //ce.emplace(sha1_info_data); // keep around? or file? + ce.emplace(sha1_info_hash); + //{ // lookup tables and have + //auto& cc = ce.emplace(); + //cc.have_all = true; + //// skip have vec, since all + ////cc.have_chunk + //cc.have_count = sha1_info.chunks.size(); // need? + + //_info_to_content[sha1_info_hash] = ce; + //for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + //_chunks[sha1_info.chunks[i]] = ce; + //cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; + //} + //} + + // TODO: ft1 specific comp + //ce.emplace(std::move(file_impl)); + } + ce.get_or_emplace().messages.push_back({reg, new_msg_e}); + + // TODO: queue info dl + + //reg_ptr->emplace(e, sha1_info); + //reg_ptr->emplace(e, sha1_info_data); // keep around? or file? + //reg.emplace(new_msg_e, std::vector{e.file_id, e.file_id+e.file_id_size}); + + if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { + reg_ptr->emplace(new_msg_e); + } + + if (ce.all_of()) { + reg_ptr->emplace(new_msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(new_msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(new_msg_e, ce.get()); + } // TODO: queue info/check if we already have info @@ -448,7 +532,8 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std return false; } - // TODO: rw + // TODO: rw? + // TODO: memory mapped would be king auto file_impl = std::make_unique(file_path); if (!file_impl->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path << "'!\n"; @@ -489,56 +574,127 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size()); std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n"; + // check if content exists + ContentHandle ce; + if (_info_to_content.count(sha1_info_hash)) { + ce = _info_to_content.at(sha1_info_hash); + + // TODO: check if content is incomplete and use file instead + if (!ce.all_of()) { + ce.emplace(sha1_info); + } + if (!ce.all_of()) { + ce.emplace(sha1_info_data); + } + + // hash has to be set already + // Components::FT1InfoSHA1Hash + + { // lookup tables and have + auto& cc = ce.get_or_emplace(); + cc.have_all = true; + // skip have vec, since all + //cc.have_chunk + cc.have_count = sha1_info.chunks.size(); // need? + + _info_to_content[sha1_info_hash] = ce; + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + _chunks[sha1_info.chunks[i]] = ce; + cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; + } + } + + { // file info + // TODO: not overwrite fi? since same? + auto& file_info = ce.emplace_or_replace(); + file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; + file_info.total_size = file_impl->_file_size; + + ce.emplace_or_replace(std::vector{std::string{file_path}}); + } + + // cleanup file + if (ce.all_of()) { + // replace + ce.remove(); + } + ce.emplace(std::move(file_impl)); + + if (!ce.all_of()) { + ce.emplace(0u); + } + + // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) + } else { + ce = {_contentr, _contentr.create()}; + _info_to_content[sha1_info_hash] = ce; + + ce.emplace(sha1_info); + ce.emplace(sha1_info_data); // keep around? or file? + ce.emplace(sha1_info_hash); + { // lookup tables and have + auto& cc = ce.emplace(); + cc.have_all = true; + // skip have vec, since all + //cc.have_chunk + cc.have_count = sha1_info.chunks.size(); // need? + + _info_to_content[sha1_info_hash] = ce; + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + _chunks[sha1_info.chunks[i]] = ce; + cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; + } + } + + { // file info + auto& file_info = ce.emplace(); + //const auto& file = ce.get(); + file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; + file_info.total_size = file_impl->_file_size; + + ce.emplace(std::vector{std::string{file_path}}); + } + + ce.emplace(std::move(file_impl)); + + ce.emplace(0u); + } + const auto c_self = _cr.get(c).self; if (!_cr.valid(c_self)) { std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; return true; } - const auto e = reg_ptr->create(); - reg_ptr->emplace(e, c); - reg_ptr->emplace(e, c_self); - reg_ptr->emplace(e, ts); // reactive? - reg_ptr->emplace(e); - reg_ptr->emplace(e); + const auto msg_e = reg_ptr->create(); + reg_ptr->emplace(msg_e, c); + reg_ptr->emplace(msg_e, c_self); + reg_ptr->emplace(msg_e, ts); // reactive? - reg_ptr->emplace(e, sha1_info); - reg_ptr->emplace(e, sha1_info_data); // keep around? or file? - reg_ptr->emplace(e, sha1_info_hash); - { // lookup tables and have - auto& cc = reg_ptr->emplace(e); - cc.have_all = true; - // skip have vec, since all - //cc.have_chunk - cc.have_count = sha1_info.chunks.size(); // need? + reg_ptr->emplace(msg_e); + reg_ptr->emplace(msg_e); + + ce.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); - _info_to_message[sha1_info_hash] = {*reg_ptr, e}; - for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - _chunks[sha1_info.chunks[i]] = {*reg_ptr, e}; - cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; - } - } //reg_ptr->emplace(e, file_kind); // file id would be sha1_info hash or something //reg_ptr->emplace(e, file_id); - { // file info - auto& file_info = reg_ptr->emplace(e); - file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; - file_info.total_size = file_impl->_file_size; - - reg_ptr->emplace(e, std::vector{std::string{file_path}}); + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); } - - reg_ptr->emplace(e); // TODO: determine if this is true //reg_ptr->emplace(e); - // TODO: ft1 specific comp - reg_ptr->emplace(e, std::move(file_impl)); #if 0 const auto friend_number = _cr.get(c).friend_number; const auto&& [transfer_id, err] = _t.toxFileSend(friend_number, file_kind, file_impl->_file_size, file_id, file_name); @@ -556,10 +712,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std // TODO: check return _nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); - reg_ptr->emplace(e, message_id); + reg_ptr->emplace(msg_e, message_id); // TODO: generalize? - auto& synced_by = reg_ptr->emplace(e).list; + auto& synced_by = reg_ptr->emplace(msg_e).list; synced_by.emplace(c_self); } else if ( // non online group @@ -567,14 +723,17 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std ) { // create msg_id const uint32_t message_id = randombytes_random(); - reg_ptr->emplace(e, message_id); + reg_ptr->emplace(msg_e, message_id); // TODO: generalize? - auto& synced_by = reg_ptr->emplace(e).list; + auto& synced_by = reg_ptr->emplace(msg_e).list; synced_by.emplace(c_self); } - _rmm.throwEventConstruct(*reg_ptr, e); + _rmm.throwEventConstruct(*reg_ptr, msg_e); + + // TODO: place in iterate? + updateMessages(ce); return true; } diff --git a/src/sha1_ngcft1.hpp b/src/sha1_ngcft1.hpp index 3a90d05..e910914 100644 --- a/src/sha1_ngcft1.hpp +++ b/src/sha1_ngcft1.hpp @@ -10,28 +10,36 @@ #include "./ft1_sha1_info.hpp" +#include +#include #include #include +enum class Content : uint32_t {}; +using ContentRegistry = entt::basic_registry; +using ContentHandle = entt::basic_handle; + class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { Contact3Registry& _cr; RegistryMessageModel& _rmm; NGCFT1& _nft; ToxContactModel2& _tcm; + // registry per group? + ContentRegistry _contentr; // limit this to each group? - entt::dense_map _info_to_message; + entt::dense_map _info_to_content; // sha1 chunk index // TODO: optimize lookup - entt::dense_map _chunks; + entt::dense_map _chunks; - // group_number, peer_number, message, chunk_hash, timer - std::deque> _queue_requested_chunk; + // group_number, peer_number, content, chunk_hash, timer + std::deque> _queue_requested_chunk; //void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash); - void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, Message3Handle msg, const SHA1Digest& hash); + void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash); struct SendingTransfer { struct Info { @@ -41,7 +49,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { }; struct Chunk { - Message3Handle msg; + ContentHandle content; uint64_t offset_into_file; // or data? // if memmapped, this would be just a pointer @@ -54,8 +62,32 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { // key is groupid + peerid entt::dense_map> _sending_transfers; + struct ReceivingTransfer { + struct Info { + // copy of info data + // too large? + std::vector info_data; + // content? + }; + + struct Chunk { + ContentHandle content; + uint64_t offset_into_file; + // or data? + // if memmapped, this would be just a pointer + }; + + std::variant v; + + float time_since_activity {0.f}; + }; + // key is groupid + peerid + entt::dense_map> _receiving_transfers; + static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number); + void updateMessages(ContentHandle ce); + public: // TODO: config bool _udp_only {false};