#include "./sha1_ngcft1.hpp" #include #include #include #include #include #include #include "./util.hpp" #include "./ft1_sha1_info.hpp" #include "./hash_utils.hpp" #include #include #include "./file_constructor.hpp" #include "./components.hpp" #include "./contact_components.hpp" #include "./chunk_picker.hpp" #include "./participation.hpp" #include "./re_announce_systems.hpp" #include "./chunk_picker_systems.hpp" #include "./transfer_stats_systems.hpp" #include #include #include static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { if (chunk_index+1 == sha1_info.chunks.size()) { // last chunk return sha1_info.file_size - chunk_index * sha1_info.chunk_size; } else { return sha1_info.chunk_size; } } void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle obj, const SHA1Digest& hash) { for (auto& [i_g, i_p, i_o, i_h, i_t] : _queue_requested_chunk) { // if already in queue if (i_g == group_number && i_p == peer_number && i_h == hash) { // update timer i_t = 0.f; return; } } // check for running transfer auto chunk_idx_vec = obj.get().chunkIndices(hash); // list is 1 entry in 99% of cases for (const size_t chunk_idx : chunk_idx_vec) { if (_sending_transfers.containsPeerChunk(group_number, peer_number, obj, chunk_idx)) { // already sending return; // skip } } // not in queue yet _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, obj, hash, 0.f)); } void SHA1_NGCFT1::updateMessages(ObjectHandle o) { assert(o.all_of()); for (auto msg : o.get().messages) { msg.emplace_or_replace(o); // messages no long hold this info // this should not update messages anymore but simply just update the object // and receivers should listen for object updates (?) _rmm.throwEventUpdate(msg); } } std::optional> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) { // get a list of peers we can request this file from std::vector> tox_peers; for (const auto c : ce.get().participants) { // TODO: sort by con state? // prio to direct? if (const auto* cs = _cr.try_get(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { continue; } if (_cr.all_of(c)) { const auto& tgpe = _cr.get(c); tox_peers.push_back({tgpe.group_number, tgpe.peer_number}); } } // 1 in 40 chance to ask random peer instead // TODO: config + tweak // TODO: save group in content to avoid the tox_peers list build // TODO: remove once pc1_announce is shipped if (tox_peers.empty() || (_rng()%40) == 0) { // meh // HACK: determain group based on last tox_peers if (!tox_peers.empty()) { const uint32_t group_number = tox_peers.back().first; auto gch = _tcm.getContactGroup(group_number); assert(static_cast(gch)); std::vector un_tox_peers; for (const auto child : gch.get().subs) { if (const auto* cs = _cr.try_get(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { continue; } if (_cr.all_of(child)) { continue; // skip self } if (_cr.all_of(child)) { const auto& tgpe = _cr.get(child); un_tox_peers.push_back(tgpe.peer_number); } } if (un_tox_peers.empty()) { // no one online, we are out of luck } else { const size_t sample_i = _rng()%un_tox_peers.size(); const auto peer_number = un_tox_peers.at(sample_i); return std::make_pair(group_number, peer_number); } } } else { const size_t sample_i = _rng()%tox_peers.size(); const auto [group_number, peer_number] = tox_peers.at(sample_i); return std::make_pair(group_number, peer_number); } return std::nullopt; } void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) { if (!static_cast(c) || !static_cast(o)) { assert(false); return; } // TODO: only queue if not already sent?? if (!o.all_of()) { return; } _queue_send_bitset.push_back(QBitsetEntry{c, o}); } File2I* SHA1_NGCFT1::objGetFile2Write(ObjectHandle o) { auto* file2_comp_ptr = o.try_get(); if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_write || !file2_comp_ptr->file->isGood()) { // (re)request file2 from backend auto new_file = _mfb.file2(o, StorageBackendI::FILE2_WRITE); if (!new_file || !new_file->can_write || !new_file->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed to open object for writing\n"; return nullptr; // early out } file2_comp_ptr = &o.emplace_or_replace(std::move(new_file)); } assert(file2_comp_ptr != nullptr); assert(static_cast(file2_comp_ptr->file)); return file2_comp_ptr->file.get(); } File2I* SHA1_NGCFT1::objGetFile2Read(ObjectHandle o) { auto* file2_comp_ptr = o.try_get(); if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_read || !file2_comp_ptr->file->isGood()) { // (re)request file2 from backend auto new_file = _mfb.file2(o, StorageBackendI::FILE2_READ); if (!new_file || !new_file->can_read || !new_file->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed to open object for reading\n"; return nullptr; // early out } file2_comp_ptr = &o.emplace_or_replace(std::move(new_file)); } assert(file2_comp_ptr != nullptr); assert(static_cast(file2_comp_ptr->file)); return file2_comp_ptr->file.get(); } SHA1_NGCFT1::SHA1_NGCFT1( ObjectStore2& os, Contact3Registry& cr, RegistryMessageModelI& rmm, NGCFT1& nft, ToxContactModel2& tcm, ToxEventProviderI& tep, NGCEXTEventProvider& neep ) : _os(os), _os_sr(_os.newSubRef(this)), _cr(cr), _rmm(rmm), _rmm_sr(_rmm.newSubRef(this)), _nft(nft), _nft_sr(_nft.newSubRef(this)), _tcm(tcm), _tep(tep), _tep_sr(_tep.newSubRef(this)), _neep(neep), _neep_sr(_neep.newSubRef(this)), _mfb(os) { _os_sr // TODO: also create and destroy // .subscribe(ObjectStore_Event::object_construct) .subscribe(ObjectStore_Event::object_update) // .subscribe(ObjectStore_Event::object_destroy) ; _nft_sr .subscribe(NGCFT1_Event::recv_request) .subscribe(NGCFT1_Event::recv_init) .subscribe(NGCFT1_Event::recv_data) .subscribe(NGCFT1_Event::send_data) .subscribe(NGCFT1_Event::recv_done) .subscribe(NGCFT1_Event::send_done) .subscribe(NGCFT1_Event::recv_message) ; _rmm_sr.subscribe(RegistryMessageModel_Event::send_file_path); _tep_sr .subscribe(Tox_Event_Type::TOX_EVENT_GROUP_PEER_JOIN) .subscribe(Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT) ; _neep_sr .subscribe(NGCEXT_Event::FT1_HAVE) .subscribe(NGCEXT_Event::FT1_BITSET) .subscribe(NGCEXT_Event::FT1_HAVE_ALL) .subscribe(NGCEXT_Event::PC1_ANNOUNCE) ; } float SHA1_NGCFT1::iterate(float delta) { //std::cerr << "---------- new tick ----------\n"; _mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast entt::dense_map peer_open_requests; { // timers // sending transfers _sending_transfers.tick(delta); // receiving transfers _receiving_transfers.tick(delta); // queued requests for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { float& timer = std::get(*it); timer += delta; // forget after 10sec if (timer >= 10.f) { it = _queue_requested_chunk.erase(it); } else { it++; } } { // requested info timers std::vector timed_out; _os.registry().view().each([delta, &timed_out](Object ov, Components::ReRequestInfoTimer& rrit) { rrit.timer += delta; // 15sec, TODO: config if (rrit.timer >= 15.f) { timed_out.push_back(ov); } }); for (const auto e : timed_out) { // TODO: avoid dups for (const ObjectHandle it : _queue_content_want_info) { assert(it != e); } auto o = _os.objectHandle(e); assert(!o.any_of()); assert(!o.any_of()); _queue_content_want_info.push_back(o); //_os.registry().remove(e); o.remove(); // TODO: throw update? } } { // requested chunk timers _os.registry().view().each([delta, &peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { it->second.timer += delta; // TODO: config if (it->second.timer >= 60.f) { it = ftchunk_requested.chunks.erase(it); } else { peer_open_requests[it->second.c] += 1; it++; } } }); } } Systems::re_announce(_os.registry(), _cr, _neep, delta); { // send out bitsets // currently 1 per tick if (!_queue_send_bitset.empty()) { const auto& qe = _queue_send_bitset.front(); if (static_cast(qe.o) && static_cast(qe.c) && qe.c.all_of() && qe.o.all_of()) { const auto [group_number, peer_number] = qe.c.get(); const auto& info_hash = qe.o.get().hash; const auto& info = qe.o.get(); const auto total_chunks = info.chunks.size(); static constexpr size_t bits_per_packet {8u*512u}; if (qe.o.all_of()) { // send have all _neep.send_ft1_have_all( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size() ); } else if (const auto* lhb = qe.o.try_get(); lhb != nullptr) { for (size_t i = 0; i < total_chunks; i += bits_per_packet) { size_t bits_this_packet = std::min(bits_per_packet, total_chunks-i); BitSet have(bits_this_packet); // default init to zero // TODO: optimize selective copy bitset for (size_t j = i; j < i+bits_this_packet; j++) { if (lhb->have[j]) { have.set(j-i); } } // TODO: this bursts, dont _neep.send_ft1_bitset( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size(), i, have._bytes.data(), have.size_bytes() ); } } // else, we have nothing *shrug* } _queue_send_bitset.pop_front(); } } // if we have not reached the total cap for transfers // count running transfers size_t running_sending_transfer_count {_sending_transfers.size()}; size_t running_receiving_transfer_count {_receiving_transfers.size()}; if (running_sending_transfer_count < _max_concurrent_out) { // TODO: for each peer? transfer cap per peer? // TODO: info queue if (!_queue_requested_chunk.empty()) { // then check for chunk requests const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front(); auto chunk_idx_vec = ce.get().chunkIndices(chunk_hash); if (!chunk_idx_vec.empty()) { // check if already sending if (!_sending_transfers.containsPeerChunk(group_number, peer_number, ce, chunk_idx_vec.front())) { const auto& info = ce.get(); uint8_t transfer_id {0}; if (_nft.NGC_FT1_send_init_private( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), chunk_hash.data.data(), chunk_hash.size(), chunkSize(info, chunk_idx_vec.front()), &transfer_id )) { _sending_transfers.emplaceChunk( group_number, peer_number, transfer_id, SendingTransfers::Entry::Chunk{ ce, chunk_idx_vec.front() } ); } } // else just remove from queue } // remove from queue regardless _queue_requested_chunk.pop_front(); } } if (running_receiving_transfer_count < _max_concurrent_in) { // strictly priorize info if (!_queue_content_want_info.empty()) { const auto ce = _queue_content_want_info.front(); // make sure we are missing the info assert(!ce.all_of()); assert(!ce.all_of()); assert(!ce.all_of()); assert(!ce.all_of()); assert(ce.all_of()); auto selected_peer_opt = selectPeerForRequest(ce); if (selected_peer_opt.has_value()) { const auto [group_number, peer_number] = selected_peer_opt.value(); //const auto& info = msg.get(); const auto& info_hash = ce.get().hash; _nft.NGC_FT1_send_request_private( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size() ); ce.emplace(0.f); _queue_content_want_info.pop_front(); std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; } } } // ran regardless of _max_concurrent_in // new chunk picker code // TODO: need to either split up or remove some things here Systems::chunk_picker_updates( _cr, _os.registry(), peer_open_requests, _receiving_transfers, _nft, delta ); // transfer statistics systems Systems::transfer_tally_update(_os.registry(), getTimeNow()); if (peer_open_requests.empty()) { return 2.f; } else { // pretty conservative and should be ajusted on a per peer, per delay basis // seems to do the trick return 0.05f; } } // gets called back on main thread after a "new" file info got built on a different thread void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts) { // sanity if (!o.all_of()) { assert(false); return; } // update content lookup const auto& info_hash = o.get().hash; _info_to_content[info_hash] = o; // update chunk lookup const auto& cc = o.get(); const auto& info = o.get(); for (size_t i = 0; i < info.chunks.size(); i++) { _chunks[info.chunks[i]] = o; } // remove from info request queue if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) { _queue_content_want_info.erase(it); } // TODO: we dont want chunks anymore // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) // something happend, update all chunk pickers if (o.all_of()) { for (const auto& pcv : o.get().participants) { Contact3Handle pch{_cr, pcv}; assert(static_cast(pch)); pch.emplace_or_replace(); } } // in both cases, private and public, c (contact to) is the target o.get_or_emplace().targets.emplace(c); // create message const auto c_self = _cr.get(c).self; if (!_cr.valid(c_self)) { std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; return; } const auto msg_e = reg_ptr->create(); reg_ptr->emplace(msg_e, c); reg_ptr->emplace(msg_e, c_self); reg_ptr->emplace(msg_e, ts); // reactive? reg_ptr->emplace(msg_e, ts); reg_ptr->emplace(msg_e, o); //reg_ptr->emplace(msg_e); o.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); //reg_ptr->emplace(e, file_kind); // file id would be sha1_info hash or something //reg_ptr->emplace(e, file_id); if (_cr.any_of(c)) { const uint32_t group_number = _cr.get(c).group_number; uint32_t message_id = 0; // TODO: check return _nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size()); reg_ptr->emplace(msg_e, message_id); } else if ( // non online group _cr.any_of(c) ) { // create msg_id const uint32_t message_id = randombytes_random(); reg_ptr->emplace(msg_e, message_id); } // TODO: else private message reg_ptr->get_or_emplace(msg_e).ts.try_emplace(c_self, ts); reg_ptr->get_or_emplace(msg_e).ts.try_emplace(c_self, ts); _rmm.throwEventConstruct(*reg_ptr, msg_e); // TODO: place in iterate? updateMessages(o); // nop // TODO: remove } bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) { if (!e.e.all_of()) { return false; } if (!e.e.all_of()) { // not ready to load yet, skip return false; } assert(!e.e.all_of()); assert(!e.e.all_of()); assert(!e.e.all_of()); //accept(e.e, e.e.get().save_to_path); // first, open file for write(+readback) std::string full_file_path{e.e.get().save_to_path}; // TODO: replace with filesystem or something // TODO: use bool in action !!! if (full_file_path.back() != '/') { full_file_path += "/"; } // ensure dir exists std::filesystem::create_directories(full_file_path); const auto& info = e.e.get(); full_file_path += info.file_name; e.e.emplace(full_file_path); const bool file_exists = std::filesystem::exists(full_file_path); std::unique_ptr file_impl = construct_file2_rw_mapped(full_file_path, info.file_size); if (!file_impl->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n"; // we failed opening that filepath, so we should offer the user the oportunity to save it differently e.e.remove(); // stop return false; } { // next, create chuck cache and check for existing data auto& transfer_stats = e.e.get_or_emplace(); auto& lhb = e.e.get_or_emplace(); if (lhb.have.size_bits() < info.chunks.size()) { lhb.have = BitSet{info.chunks.size()}; } auto& cc = e.e.emplace(); cc.have_count = 0; cc.chunk_hash_to_index.clear(); // if copy pasta if (file_exists) { // iterate existing file for (size_t i = 0; i < info.chunks.size(); i++) { const uint64_t chunk_size = info.chunkSize(i); auto existing_data = file_impl->read(chunk_size, i*uint64_t(info.chunk_size)); assert(existing_data.size == chunk_size); if (existing_data.size == chunk_size) { const auto data_hash = SHA1Digest{hash_sha1(existing_data.ptr, existing_data.size)}; const bool data_equal = data_hash == info.chunks.at(i); if (data_equal) { lhb.have.set(i); cc.have_count += 1; // TODO: replace with some progress counter? // or move have_count/want_count or something? transfer_stats.total_down += chunk_size; //std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n"; } else { //std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n"; } } else { // error reading? } _chunks[info.chunks[i]] = e.e; cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } std::cout << "preexisting " << cc.have_count << "/" << info.chunks.size() << "\n"; if (cc.have_count >= info.chunks.size()) { e.e.emplace_or_replace(); e.e.remove(); } } else { for (size_t i = 0; i < info.chunks.size(); i++) { _chunks[info.chunks[i]] = e.e; cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } } } e.e.emplace_or_replace(std::move(file_impl)); // queue announce that we are participating e.e.get_or_emplace(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f; e.e.remove(); // start requesting from all participants if (e.e.all_of()) { std::cout << "accepted ft has " << e.e.get().participants.size() << " sp\n"; for (const auto cv : e.e.get().participants) { _cr.emplace_or_replace(cv); } } else { std::cout << "accepted ft has NO sp!\n"; } e.e.remove(); updateMessages(e.e); return false; // ? } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { // only interested in sha1 if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { return false; } //std::cout << "SHA1_NGCFT1: FT1_REQUEST fk:" << int(e.file_kind) << " [" << bin2hex({e.file_id, e.file_id+e.file_id_size}) << "]\n"; if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { if (e.file_id_size != 20) { // error return false; } SHA1Digest info_hash{e.file_id, e.file_id_size}; if (!_info_to_content.count(info_hash)) { // we dont know about this return false; } auto o = _info_to_content.at(info_hash); if (!o.all_of()) { // we dont have the info for that infohash (yet?) return false; } // TODO: queue instead //queueUpRequestInfo(e.group_number, e.peer_number, info_hash); uint8_t transfer_id {0}; if (_nft.NGC_FT1_send_init_private( e.group_number, e.peer_number, static_cast(e.file_kind), e.file_id, e.file_id_size, o.get().data.size(), &transfer_id )) { _sending_transfers.emplaceInfo( e.group_number, e.peer_number, transfer_id, SendingTransfers::Entry::Info{ o.get().data } ); } const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { if (e.file_id_size != 20) { // error return false; } SHA1Digest chunk_hash{e.file_id, e.file_id_size}; if (!_chunks.count(chunk_hash)) { // we dont know about this return false; } auto o = _chunks.at(chunk_hash); { // they advertise interest in the content const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround if (addParticipation(c, o)) { // something happend, update chunk picker assert(static_cast(c)); c.emplace_or_replace(); } } assert(o.all_of()); if (!o.get().haveChunk(o, chunk_hash)) { // we dont have the chunk return false; } // queue good request queueUpRequestChunk(e.group_number, e.peer_number, o, chunk_hash); } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { // only interested in sha1 if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { return false; } // TODO: make sure we requested this? if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { SHA1Digest sha1_info_hash {e.file_id, e.file_id_size}; if (!_info_to_content.count(sha1_info_hash)) { // no idea about this content return false; } auto ce = _info_to_content.at(sha1_info_hash); if (ce.any_of()) { // we already have the info (should) return false; } // TODO: check if e.file_size too large / ask for permission if (e.file_size > 100*1024*1024) { // a info size of 100MiB is ~640GiB for a 128KiB chunk size (default) return false; } _receiving_transfers.emplaceInfo( e.group_number, e.peer_number, e.transfer_id, {ce, std::vector(e.file_size)} ); e.accept = true; const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size}; if (!_chunks.count(sha1_chunk_hash)) { // no idea about this content return false; } auto o = _chunks.at(sha1_chunk_hash); { // they have the content (probably, might be fake, should move this to done) const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround if (addParticipation(c, o)) { // something happend, update chunk picker assert(static_cast(c)); c.emplace_or_replace(); } } assert(o.all_of()); assert(o.all_of()); const auto& cc = o.get(); if (cc.haveChunk(o, sha1_chunk_hash)) { std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n"; // we have the chunk return false; } // TODO: cache position // calc offset_into_file auto idx_vec = cc.chunkIndices(sha1_chunk_hash); assert(!idx_vec.empty()); // CHECK IF TRANSFER IN PROGESS!! for (const auto idx : idx_vec) { if (_receiving_transfers.containsPeerChunk(e.group_number, e.peer_number, o, idx)) { std::cerr << "SHA1_NGCFT1 error: " << e.group_number << ":" << e.peer_number << " offered chunk(" << idx << ") it is already receiving!!\n"; return false; } } const auto& info = o.get(); // TODO: check e.file_size assert(e.file_size == info.chunkSize(idx_vec.front())); _receiving_transfers.emplaceChunk( e.group_number, e.peer_number, e.transfer_id, ReceivingTransfers::Entry::Chunk{o, idx_vec} ); e.accept = true; // now running, remove from requested for (const auto it : _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id).getChunk().chunk_indices) { o.get_or_emplace().chunks.erase(it); } std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n"; } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { std::cerr << "SHA1_NGCFT1 waring: unknown transfer " << e.transfer_id << " from " << e.group_number << ":" << e.peer_number << "\n"; return false; } auto& transfer = _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); transfer.time_since_activity = 0.f; if (transfer.isInfo()) { std::cout << "SHA1_NGCFT1: got info data " << e.data_size << "@" << e.data_offset << " from " << e.group_number << ":" << e.peer_number << "\n"; auto& info_data = transfer.getInfo().info_data; for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) { info_data[i+e.data_offset] = e.data[i]; } } else if (transfer.isChunk()) { auto o = transfer.getChunk().content; const auto chunk_size = o.get().chunk_size; for (const auto chunk_index : transfer.getChunk().chunk_indices) { const auto offset_into_file = chunk_index * chunk_size; auto* file2 = objGetFile2Write(o); if (file2 == nullptr) { std::cerr << "SHA1_NGCFT1 error: writing file failed, no file object\n"; return false; // early out } if (!file2->write({e.data, e.data_size}, offset_into_file + e.data_offset)) { std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << entt::to_integral(o.entity()) << "@" << offset_into_file + e.data_offset << "\n"; } } auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); if (static_cast(c)) { o.get_or_emplace() .tally[c] .recently_received .push_back( Components::TransferStatsTally::Peer::Entry{ float(getTimeNow()), e.data_size } ) ; } } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { std::cerr << "SHA1_NGCFT1 error: ngcft1 requested data for unknown transfer\n"; return false; } auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); transfer.time_since_activity = 0.f; if (transfer.isInfo()) { auto& info_transfer = transfer.getInfo(); for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) { e.data[i] = info_transfer.info_data[i + e.data_offset]; } } else if (transfer.isChunk()) { auto& chunk_transfer = transfer.getChunk(); const auto& info = chunk_transfer.content.get(); auto* file2 = objGetFile2Read(chunk_transfer.content); if (file2 == nullptr) { // return true? return false; // early out } const auto data = file2->read( e.data_size, (chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset ); // TODO: optimize for (size_t i = 0; i < e.data_size && i < data.size; i++) { e.data[i] = data[i]; } // TODO: add event to propergate to messages //_rmm.throwEventUpdate(transfer); // should we? auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); if (static_cast(c)) { chunk_transfer.content.get_or_emplace() .tally[c] .recently_sent .push_back( Components::TransferStatsTally::Peer::Entry{ float(getTimeNow()), data.size } ) ; } } else { assert(false && "not implemented?"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { return false; } auto& transfer = _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); if (transfer.isInfo()) { auto& info = transfer.getInfo(); auto o = info.content; if (o.any_of()) { // we already have the info, discard _receiving_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } // check if data matches hash auto hash = hash_sha1(info.info_data.data(), info.info_data.size()); assert(o.all_of()); if (o.get().hash != hash) { std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n"; // TODO: requeue info request; eg manipulate o.get(); _receiving_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } const auto& info_data = o.emplace_or_replace(std::move(info.info_data)).data; auto& ft_info = o.emplace_or_replace(); ft_info.fromBuffer(info_data); { // file info // TODO: not overwrite fi? since same? auto& file_info = o.emplace_or_replace(ft_info.file_name, ft_info.file_size); //auto& file_info = o.emplace_or_replace(); //file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; //file_info.total_size = ft_info.file_size; } std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n"; o.remove(); if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) { _queue_content_want_info.erase(it); } o.emplace_or_replace(); updateMessages(o); } else if (transfer.isChunk()) { auto o = transfer.getChunk().content; const auto& info = o.get(); auto& cc = o.get(); // HACK: only check first chunk (they *should* all be the same) const auto chunk_index = transfer.getChunk().chunk_indices.front(); const uint64_t offset_into_file = chunk_index * uint64_t(info.chunk_size); assert(chunk_index < info.chunks.size()); const auto chunk_size = info.chunkSize(chunk_index); assert(offset_into_file+chunk_size <= info.file_size); auto* file2 = objGetFile2Read(o); if (file2 == nullptr) { // rip return false; } auto chunk_data = std::move(file2->read(chunk_size, offset_into_file)); assert(!chunk_data.empty()); // check hash of chunk auto got_hash = hash_sha1(chunk_data.ptr, chunk_data.size); if (info.chunks.at(chunk_index) == got_hash) { std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; if (!o.all_of()) { { auto& lhb = o.get_or_emplace(BitSet{info.chunks.size()}); for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) { if (lhb.have[inner_chunk_index]) { continue; } // new good chunk lhb.have.set(inner_chunk_index); cc.have_count += 1; // TODO: have wasted + metadata //o.get_or_emplace().total += chunk_data.size; // we already tallied all of them but maybe we want to set some other progress indicator here? if (cc.have_count == info.chunks.size()) { // debug check for ([[maybe_unused]] size_t i = 0; i < info.chunks.size(); i++) { assert(lhb.have[i]); } o.emplace_or_replace(); std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; // HACK: close file2, to clear ram // TODO: just add a lastActivity comp and close files every x minutes based on that file2 = nullptr; // making sure we dont have a stale ptr o.remove(); // will be recreated on demand break; } } } if (o.all_of()) { o.remove(); // save space } // queue chunk have for all participants // HACK: send immediatly to all participants for (const auto c_part : o.get().participants) { if (!_cr.all_of(c_part)) { continue; } const auto [part_group_number, part_peer_number] = _cr.get(c_part); const auto& info_hash = o.get().hash; // convert size_t to uint32_t const std::vector chunk_indices { transfer.getChunk().chunk_indices.cbegin(), transfer.getChunk().chunk_indices.cend() }; _neep.send_ft1_have( part_group_number, part_peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size(), chunk_indices.data(), chunk_indices.size() ); } } else { std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; } // something happend, update chunk picker auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); //assert(static_cast(c)); // happened, went offline but chunk was still done o.o if (static_cast(c)) { c.emplace_or_replace(); } } else { // bad chunk std::cout << "SHA1_NGCFT1: got BAD chunk from " << e.group_number << ":" << e.peer_number << " [" << info.chunks.at(chunk_index) << "] ; instead got [" << SHA1Digest{got_hash} << "]\n"; } // remove from requested // TODO: remove at init and track running transfers differently // should be done, double check later for (const auto it : transfer.getChunk().chunk_indices) { o.get_or_emplace().chunks.erase(it); } updateMessages(o); // mostly for received bytes } _receiving_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) { if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { return false; } auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); if (transfer.isChunk()) { updateMessages(transfer.getChunk().content); // mostly for sent bytes } _sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) { return false; } uint64_t ts = Message::getTimeMS(); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround const auto self_c = c.get().self; auto* reg_ptr = _rmm.get(c); if (reg_ptr == nullptr) { std::cerr << "SHA1_NGCFT1 error: cant find reg\n"; return false; } Message3Registry& reg = *reg_ptr; // TODO: check for existence, hs or other syncing mechanics might have sent it already (or like, it arrived 2x or whatever) // TODO: use the message dup test provided via rmm auto new_msg_e = reg.create(); { // contact // from reg.emplace(new_msg_e, c); // to reg.emplace(new_msg_e, c.get().parent); } reg.emplace(new_msg_e, e.message_id); //reg.emplace(new_msg_e); // add sending? reg.emplace(new_msg_e, ts); //reg.emplace(new_msg_e, 0); reg.emplace(new_msg_e, ts); // reactive? reg.emplace(new_msg_e); { // by whom reg.get_or_emplace(new_msg_e).ts.try_emplace(self_c, ts); } { // we received it, so we have it auto& rb = reg.get_or_emplace(new_msg_e).ts; rb.try_emplace(c, ts); // TODO: how do we handle partial files??? // tox ft rn only sets self if the file was received fully rb.try_emplace(self_c, ts); } // check if content exists const auto sha1_info_hash = std::vector{e.file_id, e.file_id+e.file_id_size}; ObjectHandle o; if (_info_to_content.count(sha1_info_hash)) { o = _info_to_content.at(sha1_info_hash); std::cout << "SHA1_NGCFT1: new message has existing content\n"; } else { // TODO: backend o = _mfb.newObject(ByteSpan{sha1_info_hash}); _info_to_content[sha1_info_hash] = o; o.emplace(sha1_info_hash); std::cout << "SHA1_NGCFT1: new message has new content\n"; } o.get_or_emplace().messages.push_back({reg, new_msg_e}); reg_ptr->emplace(new_msg_e, o); // HACK: assume the message sender is participating. usually a safe bet. if (addParticipation(c, o)) { // something happend, update chunk picker assert(static_cast(c)); c.emplace_or_replace(); } // HACK: assume the message sender has all o.get_or_emplace().others[c] = {true, {}}; // TODO: queue info dl // TODO: queue info/check if we already have info if (!o.all_of() && !o.all_of()) { bool in_info_want {false}; for (const auto it : _queue_content_want_info) { if (it == o) { in_info_want = true; break; } } if (!in_info_want) { // TODO: check if already receiving _queue_content_want_info.push_back(o); } } else if (o.all_of()){ // remove from info want o.remove(); auto it = std::find(_queue_content_want_info.cbegin(), _queue_content_want_info.cend(), o); if (it != _queue_content_want_info.cend()) { _queue_content_want_info.erase(it); } } // since public o.get_or_emplace().targets.emplace(c.get().parent); _os.throwEventUpdate(o); _rmm.throwEventConstruct(reg, new_msg_e); return true; // false? } bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) { if ( // TODO: add support of offline queuing !_cr.all_of(c) ) { return false; } std::cout << "SHA1_NGCFT1: got sendFilePath()\n"; auto* reg_ptr = _rmm.get(c); if (reg_ptr == nullptr) { return false; } // get current time unix epoch utc uint64_t ts = Message::getTimeMS(); _mfb.newFromFile( file_name, file_path, [this, reg_ptr, c, ts](ObjectHandle o) { onSendFileHashFinished(o, reg_ptr, c, ts); } ); return true; } bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Join* e) { const auto group_number = tox_event_group_peer_join_get_group_number(e); const auto peer_number = tox_event_group_peer_join_get_peer_id(e); auto c_peer = _tcm.getContactGroupPeer(group_number, peer_number); auto c_group = _tcm.getContactGroup(group_number); // search for group and/or peer in announce targets _os.registry().view().each([this, c_peer, c_group](const auto ov, const Components::AnnounceTargets& at, Components::ReAnnounceTimer& rat) { if (at.targets.contains(c_group) || at.targets.contains(c_peer)) { rat.lower(); } }); return false; } bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { const auto group_number = tox_event_group_peer_exit_get_group_number(e); const auto peer_number = tox_event_group_peer_exit_get_peer_id(e); // peer disconnected // - remove from all participantions { // FIXME: this does not work, tcm just delteded the relation ship //auto c = _tcm.getContactGroupPeer(group_number, peer_number); const auto c_it = _tox_peer_to_contact.find(combine_ids(group_number, peer_number)); if (c_it == _tox_peer_to_contact.end()) { return false; } auto c = c_it->second; if (!static_cast(c)) { return false; } c.remove(); for (const auto& [_, o] : _info_to_content) { removeParticipation(c, o); if (o.all_of()) { o.get().others.erase(c); } } } // - clear queues for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { if (group_number == std::get<0>(*it) && peer_number == std::get<1>(*it)) { it = _queue_requested_chunk.erase(it); } else { it++; } } // TODO: nfcft1 should have fired receive/send done events for all them running transfers return false; } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) { std::cerr << "SHA1_NGCFT1: got FT1_HAVE s:" << e.chunks.size() << "\n"; if (e.file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; } SHA1Digest info_hash{e.file_id}; auto itc_it = _info_to_content.find(info_hash); if (itc_it == _info_to_content.end()) { // we are not interested and dont track this return false; } auto o = itc_it->second; if (!static_cast(o)) { std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n"; return false; } const size_t num_total_chunks = o.get().chunks.size(); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); assert(static_cast(c)); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround // we might not know yet if (addParticipation(c, o)) { // something happend, update chunk picker //c.emplace_or_replace(); } auto& remote_have = o.get_or_emplace().others; if (!remote_have.contains(c)) { // init remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks}); // new have? nice //c.emplace_or_replace(); } auto& remote_have_peer = remote_have.at(c); if (remote_have_peer.have_all) { return true; // peer somehow already had all, ignoring } assert(remote_have_peer.have.size_bits() >= num_total_chunks); bool a_valid_change {false}; for (const auto c_i : e.chunks) { if (c_i >= num_total_chunks) { std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n"; std::cerr << info_hash << ": " << c_i << " >= " << num_total_chunks << "\n"; continue; } assert(c_i < num_total_chunks); remote_have_peer.have.set(c_i); a_valid_change = true; } if (a_valid_change) { // new have? nice c.emplace_or_replace(); } // check for completion? // TODO: optimize bool test_all {true}; for (size_t i = 0; i < remote_have_peer.have.size_bits(); i++) { if (!remote_have_peer.have[i]) { test_all = false; break; } } if (test_all) { // optimize remote_have_peer.have_all = true; remote_have_peer.have = BitSet{}; } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) { std::cerr << "SHA1_NGCFT1: got FT1_BITSET o:" << e.start_chunk << " s:" << e.chunk_bitset.size()*8 << "\n"; if (e.file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; } if (e.chunk_bitset.empty()) { // what return false; } SHA1Digest info_hash{e.file_id}; auto itc_it = _info_to_content.find(info_hash); if (itc_it == _info_to_content.end()) { // we are not interested and dont track this return false; } auto o = itc_it->second; if (!static_cast(o)) { std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n"; return false; } const size_t num_total_chunks = o.get().chunks.size(); // +7 for byte rounding if (num_total_chunks+7 < e.start_chunk + (e.chunk_bitset.size()*8)) { std::cerr << "SHA1_NGCFT1 error: got bitset.size+start that is larger then number of chunks!!\n"; std::cerr << "total:" << num_total_chunks << " start:" << e.start_chunk << " size:" << e.chunk_bitset.size()*8 << "\n"; return false; } const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); assert(static_cast(c)); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround // we might not know yet addParticipation(c, o); auto& remote_have = o.get_or_emplace().others; if (!remote_have.contains(c)) { // init remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks}); } auto& remote_have_peer = remote_have.at(c); if (!remote_have_peer.have_all) { // TODO: maybe unset with bitset? BitSet event_bitset{e.chunk_bitset}; remote_have_peer.have.merge(event_bitset, e.start_chunk); // check for completion? // TODO: optimize bool test_all {true}; for (size_t i = 0; i < remote_have_peer.have.size_bits(); i++) { if (!remote_have_peer.have[i]) { test_all = false; break; } } if (test_all) { // optimize remote_have_peer.have_all = true; remote_have_peer.have = BitSet{}; } } // new have? nice c.emplace_or_replace(); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) { std::cerr << "SHA1_NGCFT1: got FT1_HAVE_ALL s:" << e.file_id.size() << "\n"; if (e.file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; } SHA1Digest info_hash{e.file_id}; auto itc_it = _info_to_content.find(info_hash); if (itc_it == _info_to_content.end()) { // we are not interested and dont track this return false; } auto o = itc_it->second; if (!static_cast(o)) { std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n"; return false; } const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); assert(static_cast(c)); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround // we might not know yet addParticipation(c, o); auto& remote_have = o.get_or_emplace().others; remote_have[c] = Components::RemoteHaveBitset::Entry{true, {}}; // new have? nice c.emplace_or_replace(); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) { std::cerr << "SHA1_NGCFT1: got PC1_ANNOUNCE s:" << e.id.size() << "\n"; // id is file_kind + id uint32_t file_kind = 0u; static_assert(SHA1Digest{}.size() == 20); if (e.id.size() != sizeof(file_kind) + 20) { // not for us return false; } for (size_t i = 0; i < sizeof(file_kind); i++) { file_kind |= uint32_t(e.id[i]) << (i*8); } if (file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; } SHA1Digest hash{e.id.data()+sizeof(file_kind), 20}; // if have use hash(-info) for file, add to participants std::cout << "SHA1_NGCFT1: got ParticipationChatter1 announce from " << e.group_number << ":" << e.peer_number << " for " << hash << "\n"; auto itc_it = _info_to_content.find(hash); if (itc_it == _info_to_content.end()) { // we are not interested and dont track this return false; } // add to participants const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround auto o = itc_it->second; if (addParticipation(c, o)) { // something happend, update chunk picker // !!! this is probably too much assert(static_cast(c)); c.emplace_or_replace(); std::cout << "SHA1_NGCFT1: and we where interested!\n"; // we should probably send the bitset back here / add to queue (can be multiple packets) if (o.all_of() && o.get().have_count > 0) { queueBitsetSendFull(c, o); } } return false; }