#include "./sha1_ngcft1.hpp" #include #include #include #include #include #include #include "./ft1_sha1_info.hpp" #include "./hash_utils.hpp" #include #include #include "./file_rw_mapped.hpp" #include #include #include #include #include namespace Message::Components { using Content = ContentHandle; } // Message::Components // TODO: rename to content components namespace Components { struct Messages { std::vector messages; }; using FT1InfoSHA1 = FT1InfoSHA1; struct FT1InfoSHA1Data { std::vector data; }; struct FT1InfoSHA1Hash { std::vector hash; }; struct FT1ChunkSHA1Cache { std::vector have_chunk; bool have_all {false}; size_t have_count {0}; entt::dense_map> chunk_hash_to_index; std::vector chunkIndices(const SHA1Digest& hash) const; bool haveChunk(const SHA1Digest& hash) const; }; struct FT1ChunkSHA1Requested { // requested chunks with a timer since last request entt::dense_map chunks; }; struct SuspectedParticipants { entt::dense_set participants; }; struct ReRequestInfoTimer { float timer {0.f}; }; struct ReadHeadHint { // points to the first byte we want // this is just a hint, that can be set from outside // to guide the sequential "piece picker" strategy // the strategy *should* set this to the first byte we dont yet have uint64_t offset_into_file {0u}; }; } // Components std::vector Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const { const auto it = chunk_hash_to_index.find(hash); if (it != chunk_hash_to_index.cend()) { return it->second; } else { return {}; } } bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { if (have_all) { // short cut return true; } if (auto i_vec = chunkIndices(hash); !i_vec.empty()) { // TODO: should i test all? return have_chunk[i_vec.front()]; } // not part of this file return false; } static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { if (chunk_index+1 == sha1_info.chunks.size()) { // last chunk return sha1_info.file_size - chunk_index * sha1_info.chunk_size; } else { return sha1_info.chunk_size; } } void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash) { // TODO: transfers for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) { // if already in queue if (i_g == group_number && i_p == peer_number && i_h == hash) { // update timer i_t = 0.f; return; } } // not in queue yet _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f)); } uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t peer_number) { return (uint64_t(group_number) << 32) | peer_number; } void SHA1_NGCFT1::updateMessages(ContentHandle ce) { assert(ce.all_of()); for (auto msg : ce.get().messages) { if (ce.all_of() && !msg.all_of()) { msg.emplace(ce.get()); } if (ce.all_of()) { msg.emplace_or_replace(ce.get()); } if (ce.all_of()) { msg.emplace_or_replace(ce.get()); } if (ce.all_of()) { msg.emplace_or_replace(ce.get()); } if (ce.all_of()) { msg.emplace_or_replace(); } else { msg.remove(); } if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { msg.emplace_or_replace(); } _rmm.throwEventUpdate(msg); } } std::optional> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) { // get a list of peers we can request this file from // TODO: randomly request from non SuspectedParticipants std::vector> tox_peers; for (const auto c : ce.get().participants) { // TODO: sort by con state? // prio to direct? if (const auto* cs = _cr.try_get(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { continue; } if (_cr.all_of(c)) { const auto& tgpe = _cr.get(c); tox_peers.push_back({tgpe.group_number, tgpe.peer_number}); } } // 1 in 20 chance to ask random peer instead // TODO: config + tweak // TODO: save group in content to avoid the tox_peers list build if (tox_peers.empty() || (_rng()%20) == 0) { // meh // HACK: determain group based on last tox_peers if (!tox_peers.empty()) { const uint32_t group_number = tox_peers.back().first; auto gch = _tcm.getContactGroup(group_number); assert(static_cast(gch)); std::vector un_tox_peers; for (const auto child : gch.get().subs) { if (const auto* cs = _cr.try_get(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { continue; } if (_cr.all_of(child)) { const auto& tgpe = _cr.get(child); un_tox_peers.push_back(tgpe.peer_number); } } if (un_tox_peers.empty()) { // no one online, we are out of luck } else { const size_t sample_i = _rng()%un_tox_peers.size(); const auto peer_number = un_tox_peers.at(sample_i); return std::make_pair(group_number, peer_number); } } } else { const size_t sample_i = _rng()%tox_peers.size(); const auto [group_number, peer_number] = tox_peers.at(sample_i); return std::make_pair(group_number, peer_number); } return std::nullopt; } SHA1_NGCFT1::SHA1_NGCFT1( Contact3Registry& cr, RegistryMessageModel& rmm, NGCFT1& nft, ToxContactModel2& tcm ) : _cr(cr), _rmm(rmm), _nft(nft), _tcm(tcm) { // TODO: also create and destroy _rmm.subscribe(this, RegistryMessageModel_Event::message_updated); _nft.subscribe(this, NGCFT1_Event::recv_request); _nft.subscribe(this, NGCFT1_Event::recv_init); _nft.subscribe(this, NGCFT1_Event::recv_data); _nft.subscribe(this, NGCFT1_Event::send_data); _nft.subscribe(this, NGCFT1_Event::recv_done); _nft.subscribe(this, NGCFT1_Event::send_done); _nft.subscribe(this, NGCFT1_Event::recv_message); //_rmm.subscribe(this, RegistryMessageModel_Event::message_construct); //_rmm.subscribe(this, RegistryMessageModel_Event::message_updated); //_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); _rmm.subscribe(this, RegistryMessageModel_Event::send_file_path); } void SHA1_NGCFT1::iterate(float delta) { // info builder queue if (_info_builder_dirty) { std::lock_guard l{_info_builder_queue_mutex}; _info_builder_dirty = false; // set while holding lock for (auto& it : _info_builder_queue) { //it.fn(); it(); } _info_builder_queue.clear(); } { // timers // sending transfers for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) { for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { it->second.time_since_activity += delta; // if we have not heard for 10sec, timeout if (it->second.time_since_activity >= 10.f) { std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n"; it = peer_it->second.erase(it); } else { it++; } } if (peer_it->second.empty()) { // cleanup unused peers too agressive? peer_it = _sending_transfers.erase(peer_it); } else { peer_it++; } } // receiving transfers for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) { for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { it->second.time_since_activity += delta; // if we have not heard for 10sec, timeout if (it->second.time_since_activity >= 10.f) { std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n"; // TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading //it->second.v it = peer_it->second.erase(it); } else { it++; } } if (peer_it->second.empty()) { // cleanup unused peers too agressive? peer_it = _receiving_transfers.erase(peer_it); } else { peer_it++; } } // queued requests for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { float& timer = std::get(*it); timer += delta; if (timer >= 10.f) { it = _queue_requested_chunk.erase(it); } else { it++; } } { // requested info timers std::vector timed_out; _contentr.view().each([delta, &timed_out](Content e, Components::ReRequestInfoTimer& rrit) { rrit.timer += delta; // 15sec, TODO: config if (rrit.timer >= 15.f) { timed_out.push_back(e); } }); for (const auto e : timed_out) { // TODO: avoid dups _queue_content_want_info.push_back({_contentr, e}); _contentr.remove(e); } } { // requested chunk timers _contentr.view().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { it->second += delta; // 20sec, TODO: config if (it->second >= 20.f) { it = ftchunk_requested.chunks.erase(it); } else { it++; } } }); } } // if we have not reached the total cap for transfers // count running transfers size_t running_sending_transfer_count {0}; for (const auto& [_, transfers] : _sending_transfers) { running_sending_transfer_count += transfers.size(); } size_t running_receiving_transfer_count {0}; for (const auto& [_, transfers] : _receiving_transfers) { running_receiving_transfer_count += transfers.size(); } if (running_sending_transfer_count < _max_concurrent_out) { // TODO: for each peer? transfer cap per peer? // TODO: info queue if (!_queue_requested_chunk.empty()) { // then check for chunk requests const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front(); auto chunk_idx_vec = ce.get().chunkIndices(chunk_hash); if (!chunk_idx_vec.empty()) { // check if already sending bool already_sending_to_this_peer = false; if (_sending_transfers.count(combineIds(group_number, peer_number))) { for (const auto& [_2, t] : _sending_transfers.at(combineIds(group_number, peer_number))) { if (std::holds_alternative(t.v)) { const auto& v = std::get(t.v); if (v.content == ce && v.chunk_index == chunk_idx_vec.front()) { // already sending already_sending_to_this_peer = true; break; } } } } if (!already_sending_to_this_peer) { const auto& info = ce.get(); uint8_t transfer_id {0}; if (_nft.NGC_FT1_send_init_private( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), chunk_hash.data.data(), chunk_hash.size(), chunkSize(info, chunk_idx_vec.front()), &transfer_id )) { _sending_transfers [combineIds(group_number, peer_number)] [transfer_id] // TODO: also save index? .v = SendingTransfer::Chunk{ce, chunk_idx_vec.front()}; } } // else just remove from queue } // remove from queue regardless _queue_requested_chunk.pop_front(); } } if (running_receiving_transfer_count < _max_concurrent_in) { // strictly priorize info if (!_queue_content_want_info.empty()) { const auto ce = _queue_content_want_info.front(); // make sure we are missing the info assert(!ce.all_of()); assert(!ce.all_of()); assert(!ce.all_of()); assert(!ce.all_of()); assert(ce.all_of()); auto selected_peer_opt = selectPeerForRequest(ce); if (selected_peer_opt.has_value()) { const auto [group_number, peer_number] = selected_peer_opt.value(); //const auto& info = msg.get(); const auto& info_hash = ce.get().hash; _nft.NGC_FT1_send_request_private( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size() ); ce.emplace(0.f); _queue_content_want_info.pop_front(); std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; } } else if (!_queue_content_want_chunk.empty()) { const auto ce = _queue_content_want_chunk.front(); auto& requested_chunks = ce.get_or_emplace().chunks; if (requested_chunks.size() < _max_pending_requests) { // select chunk/make sure we still need one auto selected_peer_opt = selectPeerForRequest(ce); if (selected_peer_opt.has_value()) { const auto [group_number, peer_number] = selected_peer_opt.value(); //std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n"; auto& cc = ce.get(); const auto& info = ce.get(); // naive, choose first chunk we dont have (double requests!!) for (size_t chunk_idx = 0; chunk_idx < cc.have_chunk.size(); chunk_idx++) { if (cc.have_chunk[chunk_idx]) { continue; } // check by hash if (cc.haveChunk(info.chunks.at(chunk_idx))) { // TODO: fix this, a completed chunk should fill all the indecies it occupies cc.have_chunk[chunk_idx] = true; cc.have_count += 1; if (cc.have_count == info.chunks.size()) { cc.have_all = true; cc.have_chunk.clear(); break; } continue; } if (requested_chunks.count(chunk_idx)) { // already requested continue; } // request chunk_idx _nft.NGC_FT1_send_request_private( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size() ); requested_chunks[chunk_idx] = 0.f; std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n"; break; } // ... // TODO: properly determine if (!cc.have_all) { _queue_content_want_chunk.push_back(ce); } _queue_content_want_chunk.pop_front(); } } } } } bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { // see tox_transfer_manager.cpp for reference if (!e.e.all_of()) { return false; } //accept(e.e, e.e.get().save_to_path); auto ce = e.e.get(); //if (!ce.all_of()) { if (!ce.all_of()) { // not ready to load yet, skip return false; } assert(!ce.all_of()); assert(!ce.all_of()); // first, open file for write(+readback) std::string full_file_path{e.e.get().save_to_path}; // TODO: replace with filesystem or something // TODO: ensure dir exists if (full_file_path.back() != '/') { full_file_path += "/"; } std::filesystem::create_directories(full_file_path); const auto& info = ce.get(); full_file_path += info.file_name; ce.emplace(std::vector{full_file_path}); std::unique_ptr file_impl; const bool file_exists = std::filesystem::exists(full_file_path); file_impl = std::make_unique(full_file_path, info.file_size); if (!file_impl->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n"; //e.e.remove(); // stop return false; } { // next, create chuck cache and check for existing data auto& cc = ce.emplace(); auto& bytes_received = ce.get_or_emplace().total; cc.have_all = false; cc.have_count = 0; cc.chunk_hash_to_index.clear(); // if copy pasta if (file_exists) { // iterate existing file for (size_t i = 0; i < info.chunks.size(); i++) { const uint64_t chunk_size = info.chunkSize(i); auto existing_data = file_impl->read(i*uint64_t(info.chunk_size), chunk_size); assert(existing_data.size() == chunk_size); // TODO: avoid copy const auto data_hash = SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())}; const bool data_equal = data_hash == info.chunks.at(i); cc.have_chunk.push_back(data_equal); if (data_equal) { cc.have_count += 1; bytes_received += chunk_size; //std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n"; } else { //std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n"; } _chunks[info.chunks[i]] = ce; cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } std::cout << "preexisting " << cc.have_count << "/" << info.chunks.size() << "\n"; if (cc.have_count >= info.chunks.size()) { cc.have_all = true; //ce.remove(); } } else { for (size_t i = 0; i < info.chunks.size(); i++) { cc.have_chunk.push_back(false); _chunks[info.chunks[i]] = ce; cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } } if (!cc.have_all) { // now, enque _queue_content_want_chunk.push_back(ce); } } ce.emplace(std::move(file_impl)); ce.remove(); // should? e.e.remove(); updateMessages(ce); return false; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { // only interested in sha1 if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { return false; } //std::cout << "SHA1_NGCFT1: FT1_REQUEST fk:" << int(e.file_kind) << " [" << bin2hex({e.file_id, e.file_id+e.file_id_size}) << "]\n"; if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { if (e.file_id_size != 20) { // error return false; } SHA1Digest info_hash{e.file_id, e.file_id_size}; if (!_info_to_content.count(info_hash)) { // we dont know about this return false; } auto content = _info_to_content.at(info_hash); if (!content.all_of()) { // we dont have the info for that infohash (yet?) return false; } // TODO: queue instead //queueUpRequestInfo(e.group_number, e.peer_number, info_hash); uint8_t transfer_id {0}; _nft.NGC_FT1_send_init_private( e.group_number, e.peer_number, static_cast(e.file_kind), e.file_id, e.file_id_size, content.get().data.size(), &transfer_id ); _sending_transfers [combineIds(e.group_number, e.peer_number)] [transfer_id] .v = SendingTransfer::Info{content.get().data}; } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { if (e.file_id_size != 20) { // error return false; } SHA1Digest chunk_hash{e.file_id, e.file_id_size}; if (!_chunks.count(chunk_hash)) { // we dont know about this return false; } auto ce = _chunks.at(chunk_hash); { // they advertise interest in the content const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); ce.get_or_emplace().participants.emplace(c); } assert(ce.all_of()); if (!ce.get().haveChunk(chunk_hash)) { // we dont have the chunk return false; } // queue good request queueUpRequestChunk(e.group_number, e.peer_number, ce, chunk_hash); } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { // only interested in sha1 if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { return false; } // TODO: make sure we requested this? if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { SHA1Digest sha1_info_hash {e.file_id, e.file_id_size}; if (!_info_to_content.count(sha1_info_hash)) { // no idea about this content return false; } auto ce = _info_to_content.at(sha1_info_hash); if (ce.any_of()) { // we already have the info (should) return false; } // TODO: check if e.file_size too large / ask for permission if (e.file_size > 100*1024*1024) { // a info size of 100MiB is ~640GiB for a 128KiB chunk size (default) return false; } _receiving_transfers [combineIds(e.group_number, e.peer_number)] [e.transfer_id] .v = ReceivingTransfer::Info{ce, std::vector(e.file_size)}; e.accept = true; } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size}; if (!_chunks.count(sha1_chunk_hash)) { // no idea about this content return false; } auto ce = _chunks.at(sha1_chunk_hash); // CHECK IF TRANSFER IN PROGESS!! { // they have the content (probably, might be fake, should move this to done) const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); ce.get_or_emplace().participants.emplace(c); } assert(ce.all_of()); assert(ce.all_of()); const auto& cc = ce.get(); if (cc.haveChunk(sha1_chunk_hash)) { std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n"; // we have the chunk return false; } // TODO: cache position // calc offset_into_file auto idx_vec = cc.chunkIndices(sha1_chunk_hash); assert(!idx_vec.empty()); const auto& info = ce.get(); // TODO: check e.file_size assert(e.file_size == info.chunkSize(idx_vec.front())); _receiving_transfers [combineIds(e.group_number, e.peer_number)] [e.transfer_id] .v = ReceivingTransfer::Chunk{ce, idx_vec}; e.accept = true; std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n"; } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) { return false; } auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number)); if (!peer_transfers.count(e.transfer_id)) { return false; } auto& tv = peer_transfers[e.transfer_id].v; peer_transfers[e.transfer_id].time_since_activity = 0.f; if (std::holds_alternative(tv)) { auto& info_data = std::get(tv).info_data; for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) { info_data[i+e.data_offset] = e.data[i]; } } else if (std::holds_alternative(tv)) { auto ce = std::get(tv).content; assert(ce.all_of()); auto* file = ce.get().get(); assert(file != nullptr); for (const auto chunk_index : std::get(tv).chunk_indices) { const auto offset_into_file = chunk_index* ce.get().chunk_size; // TODO: avoid temporary copy // TODO: check return if (!file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size})) { std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n"; } } } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { if (!_sending_transfers.count(combineIds(e.group_number, e.peer_number))) { return false; } auto& peer = _sending_transfers.at(combineIds(e.group_number, e.peer_number)); if (!peer.count(e.transfer_id)) { return false; } auto& transfer = peer.at(e.transfer_id); transfer.time_since_activity = 0.f; if (std::holds_alternative(transfer.v)) { auto& info_transfer = std::get(transfer.v); for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) { e.data[i] = info_transfer.info_data[i + e.data_offset]; } if (e.data_offset + e.data_size >= info_transfer.info_data.size()) { // was last read (probably TODO: add transfer destruction event) peer.erase(e.transfer_id); } } else if (std::holds_alternative(transfer.v)) { auto& chunk_transfer = std::get(transfer.v); const auto& info = chunk_transfer.content.get(); // TODO: should we really use file? const auto data = chunk_transfer.content.get()->read((chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset, e.data_size); // TODO: optimize for (size_t i = 0; i < e.data_size && i < data.size(); i++) { e.data[i] = data[i]; } chunk_transfer.content.get_or_emplace().total += data.size(); // TODO: add event to propergate to messages //_rmm.throwEventUpdate(transfer); // should we? //if (e.data_offset + e.data_size >= *insert chunk size here*) { //// was last read (probably TODO: add transfer destruction event) //peer.erase(e.transfer_id); //} } else { assert(false && "not implemented?"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) { return false; } auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number)); if (!peer_transfers.count(e.transfer_id)) { return false; } const auto& tv = peer_transfers[e.transfer_id].v; if (std::holds_alternative(tv)) { auto& info = std::get(tv); auto ce = info.content; if (ce.any_of()) { // we already have the info, discard peer_transfers.erase(e.transfer_id); return true; } // check if data matches hash auto hash = hash_sha1(info.info_data.data(), info.info_data.size()); assert(ce.all_of()); if (ce.get().hash != hash) { std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n"; // requeue info request peer_transfers.erase(e.transfer_id); return true; } const auto& info_data = ce.emplace_or_replace(std::move(info.info_data)).data; auto& ft_info = ce.emplace_or_replace(); ft_info.fromBuffer(info_data); { // file info // TODO: not overwrite fi? since same? auto& file_info = ce.emplace_or_replace(); file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; file_info.total_size = ft_info.file_size; } std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n"; ce.remove(); if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) { _queue_content_want_info.erase(it); } ce.emplace_or_replace(); updateMessages(ce); } else if (std::holds_alternative(tv)) { auto ce = std::get(tv).content; const auto& info = ce.get(); auto& cc = ce.get(); // HACK: only check first chunk (they *should* all be the same) const auto chunk_index = std::get(tv).chunk_indices.front(); const uint64_t offset_into_file = chunk_index * uint64_t(info.chunk_size); assert(chunk_index < info.chunks.size()); const auto chunk_size = info.chunkSize(chunk_index); assert(offset_into_file+chunk_size <= info.file_size); const auto chunk_data = ce.get()->read(offset_into_file, chunk_size); // check hash of chunk auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size()); if (info.chunks.at(chunk_index) == got_hash) { std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; if (!cc.have_all) { for (const auto inner_chunk_index : std::get(tv).chunk_indices) { if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) { cc.have_chunk.at(inner_chunk_index) = true; cc.have_count += 1; if (cc.have_count == info.chunks.size()) { // debug check for ([[maybe_unused]] const bool it : cc.have_chunk) { assert(it); } cc.have_all = true; cc.have_chunk.clear(); // not wasting memory std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; // HACK: remap file, to clear ram // TODO: error checking ce.get() = std::make_unique( ce.get().file_list.front(), info.file_size ); } // good chunk // TODO: have wasted + metadata ce.get_or_emplace().total += chunk_data.size(); } } } else { std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; } } else { // bad chunk std::cout << "SHA1_NGCFT1: got BAD chunk from " << e.group_number << ":" << e.peer_number << " [" << info.chunks.at(chunk_index) << "] ; instead got [" << SHA1Digest{got_hash} << "]\n"; } // remove from requested // TODO: remove at init and track running transfers differently for (const auto it : std::get(tv).chunk_indices) { ce.get_or_emplace().chunks.erase(it); } updateMessages(ce); // mostly for received bytes } peer_transfers.erase(e.transfer_id); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) { if (!_sending_transfers.count(combineIds(e.group_number, e.peer_number))) { return false; } auto& peer_transfers = _sending_transfers.at(combineIds(e.group_number, e.peer_number)); if (!peer_transfers.count(e.transfer_id)) { return false; } const auto& tv = peer_transfers[e.transfer_id].v; if (std::holds_alternative(tv)) { updateMessages(std::get(tv).content); // mostly for sent bytes } peer_transfers.erase(e.transfer_id); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) { return false; } uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); const auto self_c = c.get().self; auto* reg_ptr = _rmm.get(c); if (reg_ptr == nullptr) { std::cerr << "SHA1_NGCFT1 error: cant find reg\n"; return false; } Message3Registry& reg = *reg_ptr; // TODO: check for existence, hs or other syncing mechanics might have sent it already (or like, it arrived 2x or whatever) auto new_msg_e = reg.create(); { // contact // from reg.emplace(new_msg_e, c); // to reg.emplace(new_msg_e, c.get().parent); } reg.emplace(new_msg_e, e.message_id); reg.emplace(new_msg_e); // add sending? reg.emplace(new_msg_e, ts); //reg.emplace(new_msg_e, 0); reg.emplace(new_msg_e, ts); // reactive? reg.emplace(new_msg_e); { // by whom auto& synced_by = reg.get_or_emplace(new_msg_e).list; synced_by.emplace(self_c); } // check if content exists const auto sha1_info_hash = std::vector{e.file_id, e.file_id+e.file_id_size}; ContentHandle ce; if (_info_to_content.count(sha1_info_hash)) { ce = _info_to_content.at(sha1_info_hash); std::cout << "SHA1_NGCFT1: new message has existing content\n"; } else { ce = {_contentr, _contentr.create()}; _info_to_content[sha1_info_hash] = ce; std::cout << "SHA1_NGCFT1: new message has new content\n"; //ce.emplace(sha1_info); //ce.emplace(sha1_info_data); // keep around? or file? ce.emplace(sha1_info_hash); //{ // lookup tables and have //auto& cc = ce.emplace(); //cc.have_all = true; //// skip have vec, since all ////cc.have_chunk //cc.have_count = sha1_info.chunks.size(); // need? //_info_to_content[sha1_info_hash] = ce; //for (size_t i = 0; i < sha1_info.chunks.size(); i++) { //_chunks[sha1_info.chunks[i]] = ce; //cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; //} //} // TODO: ft1 specific comp //ce.emplace(std::move(file_impl)); } ce.get_or_emplace().messages.push_back({reg, new_msg_e}); reg_ptr->emplace(new_msg_e, ce); ce.get_or_emplace().participants.emplace(c); if (!ce.all_of() && !ce.all_of()) { // TODO: check if already receiving _queue_content_want_info.push_back(ce); } // TODO: queue info dl //reg_ptr->emplace(e, sha1_info); //reg_ptr->emplace(e, sha1_info_data); // keep around? or file? //reg.emplace(new_msg_e, std::vector{e.file_id, e.file_id+e.file_id_size}); if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { reg_ptr->emplace(new_msg_e); } if (ce.all_of()) { reg_ptr->emplace(new_msg_e, ce.get()); } if (ce.all_of()) { reg_ptr->emplace(new_msg_e, ce.get()); } if (ce.all_of()) { reg_ptr->emplace(new_msg_e, ce.get()); } // TODO: queue info/check if we already have info _rmm.throwEventConstruct(reg, new_msg_e); return true; // false? } bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) { if ( // TODO: add support of offline queuing !_cr.all_of(c) ) { return false; } std::cout << "SHA1_NGCFT1: got sendFilePath()\n"; auto* reg_ptr = _rmm.get(c); if (reg_ptr == nullptr) { return false; } // get current time unix epoch utc uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); std::thread(std::move([ // copy everything self = this, ts, c, reg_ptr, file_name_ = std::string(file_name), file_path_ = std::string(file_path) ]() mutable { // TODO: rw? // TODO: memory mapped would be king auto file_impl = std::make_unique(file_path_); if (!file_impl->isGood()) { { std::lock_guard l{self->_info_builder_queue_mutex}; self->_info_builder_queue.push_back([file_path_](){ // back on iterate thread std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; }); self->_info_builder_dirty = true; // still in scope, set before mutex unlock } return; } // 1. build info by hashing all chunks FT1InfoSHA1 sha1_info; // build info sha1_info.file_name = file_name_; sha1_info.file_size = file_impl->_file_size; { // build chunks // HACK: load file fully // TODO: the speed is truly horrid const auto file_data = file_impl->read(0, file_impl->_file_size); size_t i = 0; for (; i + sha1_info.chunk_size < file_data.size(); i += sha1_info.chunk_size) { sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, sha1_info.chunk_size)); } if (i < file_data.size()) { sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, file_data.size()-i)); } } file_impl.reset(); { std::lock_guard l{self->_info_builder_queue_mutex}; self->_info_builder_queue.push_back(std::move([ self, ts, c, reg_ptr, file_name_, file_path_, sha1_info = std::move(sha1_info) ]() mutable { // // back on iterate thread auto file_impl = std::make_unique(file_path_); if (!file_impl->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; return; } // 2. hash info std::vector sha1_info_data; std::vector sha1_info_hash; std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info; sha1_info_data = sha1_info.toBuffer(); std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n"; sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size()); std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n"; // check if content exists ContentHandle ce; if (self->_info_to_content.count(sha1_info_hash)) { ce = self->_info_to_content.at(sha1_info_hash); // TODO: check if content is incomplete and use file instead if (!ce.all_of()) { ce.emplace(sha1_info); } if (!ce.all_of()) { ce.emplace(sha1_info_data); } // hash has to be set already // Components::FT1InfoSHA1Hash { // lookup tables and have auto& cc = ce.get_or_emplace(); cc.have_all = true; // skip have vec, since all //cc.have_chunk cc.have_count = sha1_info.chunks.size(); // need? self->_info_to_content[sha1_info_hash] = ce; cc.chunk_hash_to_index.clear(); // for cpy pst for (size_t i = 0; i < sha1_info.chunks.size(); i++) { self->_chunks[sha1_info.chunks[i]] = ce; cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); } } { // file info // TODO: not overwrite fi? since same? auto& file_info = ce.emplace_or_replace(); file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; file_info.total_size = file_impl->_file_size; ce.emplace_or_replace(std::vector{std::string{file_path_}}); } // cleanup file if (ce.all_of()) { // replace ce.remove(); } ce.emplace(std::move(file_impl)); if (!ce.all_of()) { ce.emplace(0u); } ce.remove(); // we dont want the info anymore ce.remove(); if (auto it = std::find(self->_queue_content_want_info.begin(), self->_queue_content_want_info.end(), ce); it != self->_queue_content_want_info.end()) { self->_queue_content_want_info.erase(it); } // TODO: we dont want chunks anymore // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) auto it = self->_queue_content_want_chunk.begin(); while ( it != self->_queue_content_want_chunk.end() && (it = std::find(it, self->_queue_content_want_chunk.end(), ce)) != self->_queue_content_want_chunk.end() ) { it = self->_queue_content_want_chunk.erase(it); } } else { ce = {self->_contentr, self->_contentr.create()}; self->_info_to_content[sha1_info_hash] = ce; ce.emplace(sha1_info); ce.emplace(sha1_info_data); // keep around? or file? ce.emplace(sha1_info_hash); { // lookup tables and have auto& cc = ce.emplace(); cc.have_all = true; // skip have vec, since all //cc.have_chunk cc.have_count = sha1_info.chunks.size(); // need? self->_info_to_content[sha1_info_hash] = ce; cc.chunk_hash_to_index.clear(); // for cpy pst for (size_t i = 0; i < sha1_info.chunks.size(); i++) { self->_chunks[sha1_info.chunks[i]] = ce; cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); } } { // file info auto& file_info = ce.emplace(); //const auto& file = ce.get(); file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; file_info.total_size = file_impl->_file_size; ce.emplace(std::vector{std::string{file_path_}}); } ce.emplace(std::move(file_impl)); ce.emplace(0u); } const auto c_self = self->_cr.get(c).self; if (!self->_cr.valid(c_self)) { std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; return; } const auto msg_e = reg_ptr->create(); reg_ptr->emplace(msg_e, c); reg_ptr->emplace(msg_e, c_self); reg_ptr->emplace(msg_e, ts); // reactive? reg_ptr->emplace(msg_e, ts); reg_ptr->emplace(msg_e); reg_ptr->emplace(msg_e); ce.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); //reg_ptr->emplace(e, file_kind); // file id would be sha1_info hash or something //reg_ptr->emplace(e, file_id); // remove? done in updateMessages() anyway if (ce.all_of()) { reg_ptr->emplace(msg_e, ce.get()); } if (ce.all_of()) { reg_ptr->emplace(msg_e, ce.get()); } if (ce.all_of()) { reg_ptr->emplace(msg_e, ce.get()); } // TODO: determine if this is true //reg_ptr->emplace(e); if (self->_cr.any_of(c)) { const uint32_t group_number = self->_cr.get(c).group_number; uint32_t message_id = 0; // TODO: check return self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); reg_ptr->emplace(msg_e, message_id); // TODO: generalize? auto& synced_by = reg_ptr->emplace(msg_e).list; synced_by.emplace(c_self); } else if ( // non online group self->_cr.any_of(c) ) { // create msg_id const uint32_t message_id = randombytes_random(); reg_ptr->emplace(msg_e, message_id); // TODO: generalize? auto& synced_by = reg_ptr->emplace(msg_e).list; synced_by.emplace(c_self); } self->_rmm.throwEventConstruct(*reg_ptr, msg_e); // TODO: place in iterate? self->updateMessages(ce); })); self->_info_builder_dirty = true; // still in scope, set before mutex unlock } })).detach(); return true; }