#include "./sha1_ngcft1.hpp" #include #include #include #include #include #include "./util.hpp" #include "./ft1_sha1_info.hpp" #include "./hash_utils.hpp" #include #include #include "./file_rw_mapped.hpp" #include "./components.hpp" #include "./contact_components.hpp" #include "./chunk_picker.hpp" #include "./participation.hpp" #include "./re_announce_systems.hpp" #include "./chunk_picker_systems.hpp" #include "./transfer_stats_systems.hpp" #include #include #include #include #include #include namespace Message::Components { using Content = ObjectHandle; } // Message::Components static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { if (chunk_index+1 == sha1_info.chunks.size()) { // last chunk return sha1_info.file_size - chunk_index * sha1_info.chunk_size; } else { return sha1_info.chunk_size; } } void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle obj, const SHA1Digest& hash) { for (auto& [i_g, i_p, i_o, i_h, i_t] : _queue_requested_chunk) { // if already in queue if (i_g == group_number && i_p == peer_number && i_h == hash) { // update timer i_t = 0.f; return; } } // check for running transfer auto chunk_idx_vec = obj.get().chunkIndices(hash); // list is 1 entry in 99% of cases for (const size_t chunk_idx : chunk_idx_vec) { if (_sending_transfers.containsPeerChunk(group_number, peer_number, obj, chunk_idx)) { // already sending return; // skip } } // not in queue yet _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, obj, hash, 0.f)); } void SHA1_NGCFT1::updateMessages(ObjectHandle ce) { assert(ce.all_of()); for (auto msg : ce.get().messages) { if (ce.all_of() && !msg.all_of()) { msg.emplace(ce.get()); } if (ce.all_of()) { msg.emplace_or_replace(ce.get()); } if (ce.all_of()) { msg.emplace_or_replace(ce.get()); } if (ce.all_of()) { msg.emplace_or_replace(ce.get()); } if (ce.all_of()) { msg.emplace_or_replace(); } else { msg.remove(); } if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { msg.emplace_or_replace(); } _rmm.throwEventUpdate(msg); } } std::optional> SHA1_NGCFT1::selectPeerForRequest(ObjectHandle ce) { // get a list of peers we can request this file from std::vector> tox_peers; for (const auto c : ce.get().participants) { // TODO: sort by con state? // prio to direct? if (const auto* cs = _cr.try_get(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { continue; } if (_cr.all_of(c)) { const auto& tgpe = _cr.get(c); tox_peers.push_back({tgpe.group_number, tgpe.peer_number}); } } // 1 in 40 chance to ask random peer instead // TODO: config + tweak // TODO: save group in content to avoid the tox_peers list build // TODO: remove once pc1_announce is shipped if (tox_peers.empty() || (_rng()%40) == 0) { // meh // HACK: determain group based on last tox_peers if (!tox_peers.empty()) { const uint32_t group_number = tox_peers.back().first; auto gch = _tcm.getContactGroup(group_number); assert(static_cast(gch)); std::vector un_tox_peers; for (const auto child : gch.get().subs) { if (const auto* cs = _cr.try_get(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { continue; } if (_cr.all_of(child)) { continue; // skip self } if (_cr.all_of(child)) { const auto& tgpe = _cr.get(child); un_tox_peers.push_back(tgpe.peer_number); } } if (un_tox_peers.empty()) { // no one online, we are out of luck } else { const size_t sample_i = _rng()%un_tox_peers.size(); const auto peer_number = un_tox_peers.at(sample_i); return std::make_pair(group_number, peer_number); } } } else { const size_t sample_i = _rng()%tox_peers.size(); const auto [group_number, peer_number] = tox_peers.at(sample_i); return std::make_pair(group_number, peer_number); } return std::nullopt; } void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) { if (!static_cast(c) || !static_cast(o)) { assert(false); return; } // TODO: only queue if not already sent?? if (!o.all_of()) { return; } _queue_send_bitset.push_back(QBitsetEntry{c, o}); } SHA1_NGCFT1::SHA1_NGCFT1( ObjectStore2& os, Contact3Registry& cr, RegistryMessageModel& rmm, NGCFT1& nft, ToxContactModel2& tcm, ToxEventProviderI& tep, NGCEXTEventProvider& neep ) : _os(os), _cr(cr), _rmm(rmm), _nft(nft), _tcm(tcm), _tep(tep), _neep(neep) { // TODO: also create and destroy _rmm.subscribe(this, RegistryMessageModel_Event::message_updated); //_rmm.subscribe(this, RegistryMessageModel_Event::message_construct); //_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); _nft.subscribe(this, NGCFT1_Event::recv_request); _nft.subscribe(this, NGCFT1_Event::recv_init); _nft.subscribe(this, NGCFT1_Event::recv_data); _nft.subscribe(this, NGCFT1_Event::send_data); _nft.subscribe(this, NGCFT1_Event::recv_done); _nft.subscribe(this, NGCFT1_Event::send_done); _nft.subscribe(this, NGCFT1_Event::recv_message); _rmm.subscribe(this, RegistryMessageModel_Event::send_file_path); _tep.subscribe(this, Tox_Event_Type::TOX_EVENT_GROUP_PEER_EXIT); _neep.subscribe(this, NGCEXT_Event::FT1_HAVE); _neep.subscribe(this, NGCEXT_Event::FT1_BITSET); _neep.subscribe(this, NGCEXT_Event::FT1_HAVE_ALL); _neep.subscribe(this, NGCEXT_Event::PC1_ANNOUNCE); } float SHA1_NGCFT1::iterate(float delta) { //std::cerr << "---------- new tick ----------\n"; // info builder queue if (_info_builder_dirty) { std::lock_guard l{_info_builder_queue_mutex}; _info_builder_dirty = false; // set while holding lock for (auto& it : _info_builder_queue) { //it.fn(); it(); } _info_builder_queue.clear(); } entt::dense_map peer_open_requests; { // timers // sending transfers _sending_transfers.tick(delta); // receiving transfers _receiving_transfers.tick(delta); // queued requests for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { float& timer = std::get(*it); timer += delta; // forget after 10sec if (timer >= 10.f) { it = _queue_requested_chunk.erase(it); } else { it++; } } { // requested info timers std::vector timed_out; _os.registry().view().each([delta, &timed_out](Object e, Components::ReRequestInfoTimer& rrit) { rrit.timer += delta; // 15sec, TODO: config if (rrit.timer >= 15.f) { timed_out.push_back(e); } }); for (const auto e : timed_out) { // TODO: avoid dups _queue_content_want_info.push_back(_os.objectHandle(e)); _os.registry().remove(e); // TODO: throw update? } } { // requested chunk timers _os.registry().view().each([delta, &peer_open_requests](Components::FT1ChunkSHA1Requested& ftchunk_requested) { for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { it->second.timer += delta; // TODO: config if (it->second.timer >= 60.f) { it = ftchunk_requested.chunks.erase(it); } else { peer_open_requests[it->second.c] += 1; it++; } } }); } } Systems::re_announce(_os.registry(), _cr, _neep, delta); { // send out bitsets // currently 1 per tick if (!_queue_send_bitset.empty()) { const auto& qe = _queue_send_bitset.front(); if (static_cast(qe.o) && static_cast(qe.c) && qe.c.all_of() && qe.o.all_of()) { const auto [group_number, peer_number] = qe.c.get(); const auto& info_hash = qe.o.get().hash; const auto& cc = qe.o.get(); const auto& info = qe.o.get(); const auto total_chunks = info.chunks.size(); static constexpr size_t bits_per_packet {8u*512u}; if (cc.have_all) { // send have all _neep.send_ft1_have_all( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size() ); } else { for (size_t i = 0; i < total_chunks; i += bits_per_packet) { size_t bits_this_packet = std::min(bits_per_packet, total_chunks-i); BitSet have(bits_this_packet); // default init to zero // TODO: optimize selective copy bitset for (size_t j = i; j < i+bits_this_packet; j++) { if (cc.have_chunk[j]) { have.set(j-i); } } // TODO: this bursts, dont _neep.send_ft1_bitset( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size(), i, have._bytes.data(), have.size_bytes() ); } } } _queue_send_bitset.pop_front(); } } // if we have not reached the total cap for transfers // count running transfers size_t running_sending_transfer_count {_sending_transfers.size()}; size_t running_receiving_transfer_count {_receiving_transfers.size()}; if (running_sending_transfer_count < _max_concurrent_out) { // TODO: for each peer? transfer cap per peer? // TODO: info queue if (!_queue_requested_chunk.empty()) { // then check for chunk requests const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front(); auto chunk_idx_vec = ce.get().chunkIndices(chunk_hash); if (!chunk_idx_vec.empty()) { // check if already sending if (!_sending_transfers.containsPeerChunk(group_number, peer_number, ce, chunk_idx_vec.front())) { const auto& info = ce.get(); uint8_t transfer_id {0}; if (_nft.NGC_FT1_send_init_private( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), chunk_hash.data.data(), chunk_hash.size(), chunkSize(info, chunk_idx_vec.front()), &transfer_id )) { _sending_transfers.emplaceChunk( group_number, peer_number, transfer_id, SendingTransfers::Entry::Chunk{ ce, chunk_idx_vec.front() } ); } } // else just remove from queue } // remove from queue regardless _queue_requested_chunk.pop_front(); } } if (running_receiving_transfer_count < _max_concurrent_in) { // strictly priorize info if (!_queue_content_want_info.empty()) { const auto ce = _queue_content_want_info.front(); // make sure we are missing the info assert(!ce.all_of()); assert(!ce.all_of()); assert(!ce.all_of()); assert(!ce.all_of()); assert(ce.all_of()); auto selected_peer_opt = selectPeerForRequest(ce); if (selected_peer_opt.has_value()) { const auto [group_number, peer_number] = selected_peer_opt.value(); //const auto& info = msg.get(); const auto& info_hash = ce.get().hash; _nft.NGC_FT1_send_request_private( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size() ); ce.emplace(0.f); _queue_content_want_info.pop_front(); std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; } } } // ran regardless of _max_concurrent_in // new chunk picker code // TODO: need to either split up or remove some things here Systems::chunk_picker_updates( _cr, _os.registry(), peer_open_requests, _receiving_transfers, _nft, delta ); // transfer statistics systems Systems::transfer_tally_update(_os.registry(), getTimeNow()); if (peer_open_requests.empty()) { return 2.f; } else { // pretty conservative and should be ajusted on a per peer, per delay basis // seems to do the trick return 0.05f; } } bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { // see tox_transfer_manager.cpp for reference if (!e.e.all_of()) { return false; } //accept(e.e, e.e.get().save_to_path); auto ce = e.e.get(); //if (!ce.all_of()) { if (!ce.all_of()) { // not ready to load yet, skip return false; } assert(!ce.all_of()); assert(!ce.all_of()); // first, open file for write(+readback) std::string full_file_path{e.e.get().save_to_path}; // TODO: replace with filesystem or something if (full_file_path.back() != '/') { full_file_path += "/"; } // ensure dir exists std::filesystem::create_directories(full_file_path); const auto& info = ce.get(); full_file_path += info.file_name; ce.emplace(std::vector{full_file_path}); const bool file_exists = std::filesystem::exists(full_file_path); std::unique_ptr file_impl = std::make_unique(full_file_path, info.file_size); if (!file_impl->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n"; // we failed opening that filepath, so we should offer the user the oportunity to save it differently e.e.remove(); // stop return false; } { // next, create chuck cache and check for existing data auto& cc = ce.emplace(); auto& bytes_received = ce.get_or_emplace().total; cc.have_chunk = BitSet(info.chunks.size()); cc.have_all = false; cc.have_count = 0; cc.chunk_hash_to_index.clear(); // if copy pasta if (file_exists) { // iterate existing file for (size_t i = 0; i < info.chunks.size(); i++) { const uint64_t chunk_size = info.chunkSize(i); auto existing_data = file_impl->read(chunk_size, i*uint64_t(info.chunk_size)); assert(existing_data.size == chunk_size); if (existing_data.size == chunk_size) { const auto data_hash = SHA1Digest{hash_sha1(existing_data.ptr, existing_data.size)}; const bool data_equal = data_hash == info.chunks.at(i); if (data_equal) { cc.have_chunk.set(i); cc.have_count += 1; bytes_received += chunk_size; //std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n"; } else { //std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n"; } } else { // error reading? } _chunks[info.chunks[i]] = ce; cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } std::cout << "preexisting " << cc.have_count << "/" << info.chunks.size() << "\n"; if (cc.have_count >= info.chunks.size()) { cc.have_all = true; //ce.remove(); } } else { for (size_t i = 0; i < info.chunks.size(); i++) { _chunks[info.chunks[i]] = ce; cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } } } ce.emplace(std::move(file_impl)); // queue announce we are participating // since this is the first time, we publicly announce to all if (e.e.all_of()) { const auto c_f = e.e.get().c; const auto c_t = e.e.get().c; if (_cr.all_of(c_t)) { // public ce.get_or_emplace().targets.emplace(c_t); } else if (_cr.all_of(c_f)) { // private ? ce.get_or_emplace().targets.emplace(c_f); } } ce.get_or_emplace(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f; ce.remove(); // start requesting from all participants if (ce.all_of()) { std::cout << "accepted ft has " << ce.get().participants.size() << " sp\n"; for (const auto cv : ce.get().participants) { _cr.emplace_or_replace(cv); } } else { std::cout << "accepted ft has NO sp!\n"; } // should? e.e.remove(); updateMessages(ce); return false; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { // only interested in sha1 if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { return false; } //std::cout << "SHA1_NGCFT1: FT1_REQUEST fk:" << int(e.file_kind) << " [" << bin2hex({e.file_id, e.file_id+e.file_id_size}) << "]\n"; if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { if (e.file_id_size != 20) { // error return false; } SHA1Digest info_hash{e.file_id, e.file_id_size}; if (!_info_to_content.count(info_hash)) { // we dont know about this return false; } auto o = _info_to_content.at(info_hash); if (!o.all_of()) { // we dont have the info for that infohash (yet?) return false; } // TODO: queue instead //queueUpRequestInfo(e.group_number, e.peer_number, info_hash); uint8_t transfer_id {0}; _nft.NGC_FT1_send_init_private( e.group_number, e.peer_number, static_cast(e.file_kind), e.file_id, e.file_id_size, o.get().data.size(), &transfer_id ); _sending_transfers.emplaceInfo( e.group_number, e.peer_number, transfer_id, SendingTransfers::Entry::Info{ o.get().data } ); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { if (e.file_id_size != 20) { // error return false; } SHA1Digest chunk_hash{e.file_id, e.file_id_size}; if (!_chunks.count(chunk_hash)) { // we dont know about this return false; } auto o = _chunks.at(chunk_hash); { // they advertise interest in the content const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround if (addParticipation(c, o)) { // something happend, update chunk picker assert(static_cast(c)); c.emplace_or_replace(); } } assert(o.all_of()); if (!o.get().haveChunk(chunk_hash)) { // we dont have the chunk return false; } // queue good request queueUpRequestChunk(e.group_number, e.peer_number, o, chunk_hash); } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { // only interested in sha1 if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { return false; } // TODO: make sure we requested this? if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { SHA1Digest sha1_info_hash {e.file_id, e.file_id_size}; if (!_info_to_content.count(sha1_info_hash)) { // no idea about this content return false; } auto ce = _info_to_content.at(sha1_info_hash); if (ce.any_of()) { // we already have the info (should) return false; } // TODO: check if e.file_size too large / ask for permission if (e.file_size > 100*1024*1024) { // a info size of 100MiB is ~640GiB for a 128KiB chunk size (default) return false; } _receiving_transfers.emplaceInfo( e.group_number, e.peer_number, e.transfer_id, {ce, std::vector(e.file_size)} ); e.accept = true; const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size}; if (!_chunks.count(sha1_chunk_hash)) { // no idea about this content return false; } auto o = _chunks.at(sha1_chunk_hash); { // they have the content (probably, might be fake, should move this to done) const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround if (addParticipation(c, o)) { // something happend, update chunk picker assert(static_cast(c)); c.emplace_or_replace(); } } assert(o.all_of()); assert(o.all_of()); const auto& cc = o.get(); if (cc.haveChunk(sha1_chunk_hash)) { std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n"; // we have the chunk return false; } // TODO: cache position // calc offset_into_file auto idx_vec = cc.chunkIndices(sha1_chunk_hash); assert(!idx_vec.empty()); // CHECK IF TRANSFER IN PROGESS!! for (const auto idx : idx_vec) { if (_receiving_transfers.containsPeerChunk(e.group_number, e.peer_number, o, idx)) { std::cerr << "SHA1_NGCFT1 error: " << e.group_number << ":" << e.peer_number << " offered chunk(" << idx << ") it is already receiving!!\n"; return false; } } const auto& info = o.get(); // TODO: check e.file_size assert(e.file_size == info.chunkSize(idx_vec.front())); _receiving_transfers.emplaceChunk( e.group_number, e.peer_number, e.transfer_id, ReceivingTransfers::Entry::Chunk{o, idx_vec} ); e.accept = true; // now running, remove from requested for (const auto it : _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id).getChunk().chunk_indices) { o.get_or_emplace().chunks.erase(it); } std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n"; } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { return false; } auto& transfer = _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); transfer.time_since_activity = 0.f; if (transfer.isInfo()) { auto& info_data = transfer.getInfo().info_data; for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) { info_data[i+e.data_offset] = e.data[i]; } } else if (transfer.isChunk()) { auto o = transfer.getChunk().content; assert(o.all_of()); auto* file = o.get().get(); assert(file != nullptr); const auto chunk_size = o.get().chunk_size; for (const auto chunk_index : transfer.getChunk().chunk_indices) { const auto offset_into_file = chunk_index * chunk_size; if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) { std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n"; } } auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); if (static_cast(c)) { o.get_or_emplace() .tally[c] .recently_received .push_back( Components::TransferStatsTally::Peer::Entry{ float(getTimeNow()), e.data_size } ) ; } } else { assert(false && "unhandled case"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { return false; } auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); transfer.time_since_activity = 0.f; if (transfer.isInfo()) { auto& info_transfer = transfer.getInfo(); for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) { e.data[i] = info_transfer.info_data[i + e.data_offset]; } } else if (transfer.isChunk()) { auto& chunk_transfer = transfer.getChunk(); const auto& info = chunk_transfer.content.get(); // TODO: should we really use file? const auto data = chunk_transfer.content.get()->read( e.data_size, (chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset ); // TODO: optimize for (size_t i = 0; i < e.data_size && i < data.size; i++) { e.data[i] = data[i]; } chunk_transfer.content.get_or_emplace().total += data.size; // TODO: add event to propergate to messages //_rmm.throwEventUpdate(transfer); // should we? auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); if (static_cast(c)) { chunk_transfer.content.get_or_emplace() .tally[c] .recently_sent .push_back( Components::TransferStatsTally::Peer::Entry{ float(getTimeNow()), data.size } ) ; } } else { assert(false && "not implemented?"); } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { if (!_receiving_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { return false; } auto& transfer = _receiving_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); if (transfer.isInfo()) { auto& info = transfer.getInfo(); auto o = info.content; if (o.any_of()) { // we already have the info, discard _receiving_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } // check if data matches hash auto hash = hash_sha1(info.info_data.data(), info.info_data.size()); assert(o.all_of()); if (o.get().hash != hash) { std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n"; // TODO: requeue info request; eg manipulate o.get(); _receiving_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } const auto& info_data = o.emplace_or_replace(std::move(info.info_data)).data; auto& ft_info = o.emplace_or_replace(); ft_info.fromBuffer(info_data); { // file info // TODO: not overwrite fi? since same? auto& file_info = o.emplace_or_replace(); file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; file_info.total_size = ft_info.file_size; } std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n"; o.remove(); if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) { _queue_content_want_info.erase(it); } o.emplace_or_replace(); updateMessages(o); } else if (transfer.isChunk()) { auto o = transfer.getChunk().content; const auto& info = o.get(); auto& cc = o.get(); // HACK: only check first chunk (they *should* all be the same) const auto chunk_index = transfer.getChunk().chunk_indices.front(); const uint64_t offset_into_file = chunk_index * uint64_t(info.chunk_size); assert(chunk_index < info.chunks.size()); const auto chunk_size = info.chunkSize(chunk_index); assert(offset_into_file+chunk_size <= info.file_size); const auto chunk_data = o.get()->read(chunk_size, offset_into_file); assert(!chunk_data.empty()); // check hash of chunk auto got_hash = hash_sha1(chunk_data.ptr, chunk_data.size); if (info.chunks.at(chunk_index) == got_hash) { std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; if (!cc.have_all) { for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) { if (!cc.have_all && !cc.have_chunk[inner_chunk_index]) { cc.have_chunk.set(inner_chunk_index); cc.have_count += 1; if (cc.have_count == info.chunks.size()) { // debug check for ([[maybe_unused]] size_t i = 0; i < info.chunks.size(); i++) { assert(cc.have_chunk[i]); } cc.have_all = true; cc.have_chunk = BitSet(0); // not wasting memory std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; // HACK: remap file, to clear ram // TODO: error checking o.get() = std::make_unique( o.get().file_list.front(), info.file_size ); } // good chunk // TODO: have wasted + metadata o.get_or_emplace().total += chunk_data.size; } } // queue chunk have for all participants // HACK: send immediatly to all participants for (const auto c_part : o.get().participants) { if (!_cr.all_of(c_part)) { continue; } const auto [part_group_number, part_peer_number] = _cr.get(c_part); const auto& info_hash = o.get().hash; // convert size_t to uint32_t const std::vector chunk_indices { transfer.getChunk().chunk_indices.cbegin(), transfer.getChunk().chunk_indices.cend() }; _neep.send_ft1_have( part_group_number, part_peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size(), chunk_indices.data(), chunk_indices.size() ); } if (!cc.have_all) { // debug print self have set std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits(); for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) { if (i % 32 == 0) { printf("\n"); } // f cout printf("%.2x", (uint16_t)cc.have_chunk.data()[i]); } printf("\n"); } } else { std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; } // something happend, update chunk picker auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); assert(static_cast(c)); c.emplace_or_replace(); } else { // bad chunk std::cout << "SHA1_NGCFT1: got BAD chunk from " << e.group_number << ":" << e.peer_number << " [" << info.chunks.at(chunk_index) << "] ; instead got [" << SHA1Digest{got_hash} << "]\n"; } // remove from requested // TODO: remove at init and track running transfers differently // should be done, double check later for (const auto it : transfer.getChunk().chunk_indices) { o.get_or_emplace().chunks.erase(it); } updateMessages(o); // mostly for received bytes } _receiving_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) { if (!_sending_transfers.containsPeerTransfer(e.group_number, e.peer_number, e.transfer_id)) { return false; } auto& transfer = _sending_transfers.getTransfer(e.group_number, e.peer_number, e.transfer_id); if (transfer.isChunk()) { updateMessages(transfer.getChunk().content); // mostly for sent bytes } _sending_transfers.removePeerTransfer(e.group_number, e.peer_number, e.transfer_id); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) { return false; } uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround const auto self_c = c.get().self; auto* reg_ptr = _rmm.get(c); if (reg_ptr == nullptr) { std::cerr << "SHA1_NGCFT1 error: cant find reg\n"; return false; } Message3Registry& reg = *reg_ptr; // TODO: check for existence, hs or other syncing mechanics might have sent it already (or like, it arrived 2x or whatever) // TODO: use the message dup test provided via rmm auto new_msg_e = reg.create(); { // contact // from reg.emplace(new_msg_e, c); // to reg.emplace(new_msg_e, c.get().parent); } reg.emplace(new_msg_e, e.message_id); reg.emplace(new_msg_e); // add sending? reg.emplace(new_msg_e, ts); //reg.emplace(new_msg_e, 0); reg.emplace(new_msg_e, ts); // reactive? reg.emplace(new_msg_e); { // by whom reg.get_or_emplace(new_msg_e).ts.try_emplace(self_c, ts); } { // we received it, so we have it auto& rb = reg.get_or_emplace(new_msg_e).ts; rb.try_emplace(c, ts); // TODO: how do we handle partial files??? // tox ft rn only sets self if the file was received fully rb.try_emplace(self_c, ts); } // check if content exists const auto sha1_info_hash = std::vector{e.file_id, e.file_id+e.file_id_size}; ObjectHandle ce; if (_info_to_content.count(sha1_info_hash)) { ce = _info_to_content.at(sha1_info_hash); std::cout << "SHA1_NGCFT1: new message has existing content\n"; } else { // TODO: backend ce = {_os.registry(), _os.registry().create()}; _info_to_content[sha1_info_hash] = ce; std::cout << "SHA1_NGCFT1: new message has new content\n"; //ce.emplace(sha1_info); //ce.emplace(sha1_info_data); // keep around? or file? ce.emplace(sha1_info_hash); } ce.get_or_emplace().messages.push_back({reg, new_msg_e}); reg_ptr->emplace(new_msg_e, ce); // HACK: assume the message sender is participating. usually a safe bet. if (addParticipation(c, ce)) { // something happend, update chunk picker assert(static_cast(c)); c.emplace_or_replace(); } // HACK: assume the message sender has all ce.get_or_emplace().others[c] = {true, {}}; if (!ce.all_of() && !ce.all_of()) { // TODO: check if already receiving _queue_content_want_info.push_back(ce); } // TODO: queue info dl //reg_ptr->emplace(e, sha1_info); //reg_ptr->emplace(e, sha1_info_data); // keep around? or file? //reg.emplace(new_msg_e, std::vector{e.file_id, e.file_id+e.file_id_size}); if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { reg_ptr->emplace(new_msg_e); } if (ce.all_of()) { reg_ptr->emplace(new_msg_e, ce.get()); } if (ce.all_of()) { reg_ptr->emplace(new_msg_e, ce.get()); } if (ce.all_of()) { reg_ptr->emplace(new_msg_e, ce.get()); } // TODO: queue info/check if we already have info _rmm.throwEventConstruct(reg, new_msg_e); return true; // false? } bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) { if ( // TODO: add support of offline queuing !_cr.all_of(c) ) { return false; } std::cout << "SHA1_NGCFT1: got sendFilePath()\n"; auto* reg_ptr = _rmm.get(c); if (reg_ptr == nullptr) { return false; } // get current time unix epoch utc uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); std::thread(std::move([ // copy everything self = this, ts, c, reg_ptr, file_name_ = std::string(file_name), file_path_ = std::string(file_path) ]() mutable { auto file_impl = std::make_unique(file_path_, -1); if (!file_impl->isGood()) { { std::lock_guard l{self->_info_builder_queue_mutex}; self->_info_builder_queue.push_back([file_path_](){ // back on iterate thread std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; }); self->_info_builder_dirty = true; // still in scope, set before mutex unlock } return; } // 1. build info by hashing all chunks FT1InfoSHA1 sha1_info; // build info sha1_info.file_name = file_name_; sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details { // build chunks // HACK: load file fully // ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway const auto file_data = file_impl->read(file_impl->_file_size, 0); size_t i = 0; for (; i + sha1_info.chunk_size < file_data.size; i += sha1_info.chunk_size) { sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, sha1_info.chunk_size)); } if (i < file_data.size) { sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, file_data.size-i)); } } file_impl.reset(); { std::lock_guard l{self->_info_builder_queue_mutex}; self->_info_builder_queue.push_back(std::move([ self, ts, c, reg_ptr, file_name_, file_path_, sha1_info = std::move(sha1_info) ]() mutable { // // back on iterate thread auto file_impl = std::make_unique(file_path_, sha1_info.file_size); if (!file_impl->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n"; return; } // 2. hash info std::vector sha1_info_data; std::vector sha1_info_hash; std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info; sha1_info_data = sha1_info.toBuffer(); std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n"; sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size()); std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n"; // check if content exists ObjectHandle ce; if (self->_info_to_content.count(sha1_info_hash)) { ce = self->_info_to_content.at(sha1_info_hash); // TODO: check if content is incomplete and use file instead if (!ce.all_of()) { ce.emplace(sha1_info); } if (!ce.all_of()) { ce.emplace(sha1_info_data); } // hash has to be set already // Components::FT1InfoSHA1Hash { // lookup tables and have auto& cc = ce.get_or_emplace(); cc.have_all = true; // skip have vec, since all //cc.have_chunk cc.have_count = sha1_info.chunks.size(); // need? self->_info_to_content[sha1_info_hash] = ce; cc.chunk_hash_to_index.clear(); // for cpy pst for (size_t i = 0; i < sha1_info.chunks.size(); i++) { self->_chunks[sha1_info.chunks[i]] = ce; cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); } } { // file info // TODO: not overwrite fi? since same? auto& file_info = ce.emplace_or_replace(); file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; file_info.total_size = file_impl->_file_size; ce.emplace_or_replace(std::vector{std::string{file_path_}}); } // cleanup file if (ce.all_of()) { // replace ce.remove(); } ce.emplace(std::move(file_impl)); if (!ce.all_of()) { ce.emplace(0u); } ce.remove(); // we dont want the info anymore ce.remove(); if (auto it = std::find(self->_queue_content_want_info.begin(), self->_queue_content_want_info.end(), ce); it != self->_queue_content_want_info.end()) { self->_queue_content_want_info.erase(it); } // TODO: we dont want chunks anymore // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) } else { // TODO: backend ce = {self->_os.registry(), self->_os.registry().create()}; self->_info_to_content[sha1_info_hash] = ce; ce.emplace(sha1_info); ce.emplace(sha1_info_data); // keep around? or file? ce.emplace(sha1_info_hash); { // lookup tables and have auto& cc = ce.emplace(); cc.have_all = true; // skip have vec, since all //cc.have_chunk cc.have_count = sha1_info.chunks.size(); // need? self->_info_to_content[sha1_info_hash] = ce; cc.chunk_hash_to_index.clear(); // for cpy pst for (size_t i = 0; i < sha1_info.chunks.size(); i++) { self->_chunks[sha1_info.chunks[i]] = ce; cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); } } { // file info auto& file_info = ce.emplace(); //const auto& file = ce.get(); file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; file_info.total_size = file_impl->_file_size; ce.emplace(std::vector{std::string{file_path_}}); } ce.emplace(std::move(file_impl)); ce.emplace(0u); } // something happend, update all chunk pickers if (ce.all_of()) { for (const auto& pcv : ce.get().participants) { Contact3Handle pch{self->_cr, pcv}; assert(static_cast(pch)); pch.emplace_or_replace(); } } const auto c_self = self->_cr.get(c).self; if (!self->_cr.valid(c_self)) { std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; return; } const auto msg_e = reg_ptr->create(); reg_ptr->emplace(msg_e, c); reg_ptr->emplace(msg_e, c_self); reg_ptr->emplace(msg_e, ts); // reactive? reg_ptr->emplace(msg_e, ts); reg_ptr->emplace(msg_e); reg_ptr->emplace(msg_e); ce.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); //reg_ptr->emplace(e, file_kind); // file id would be sha1_info hash or something //reg_ptr->emplace(e, file_id); // remove? done in updateMessages() anyway if (ce.all_of()) { reg_ptr->emplace(msg_e, ce.get()); } if (ce.all_of()) { reg_ptr->emplace(msg_e, ce.get()); } if (ce.all_of()) { reg_ptr->emplace(msg_e, ce.get()); } // TODO: determine if this is true //reg_ptr->emplace(e); if (self->_cr.any_of(c)) { const uint32_t group_number = self->_cr.get(c).group_number; uint32_t message_id = 0; // TODO: check return self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); reg_ptr->emplace(msg_e, message_id); } else if ( // non online group self->_cr.any_of(c) ) { // create msg_id const uint32_t message_id = randombytes_random(); reg_ptr->emplace(msg_e, message_id); } reg_ptr->get_or_emplace(msg_e).ts.try_emplace(c_self, ts); reg_ptr->get_or_emplace(msg_e).ts.try_emplace(c_self, ts); self->_rmm.throwEventConstruct(*reg_ptr, msg_e); // TODO: place in iterate? self->updateMessages(ce); })); self->_info_builder_dirty = true; // still in scope, set before mutex unlock } })).detach(); return true; } bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { const auto group_number = tox_event_group_peer_exit_get_group_number(e); const auto peer_number = tox_event_group_peer_exit_get_peer_id(e); // peer disconnected // - remove from all participantions { // FIXME: this does not work, tcm just delteded the relation ship //auto c = _tcm.getContactGroupPeer(group_number, peer_number); const auto c_it = _tox_peer_to_contact.find(combine_ids(group_number, peer_number)); if (c_it == _tox_peer_to_contact.end()) { return false; } auto c = c_it->second; if (!static_cast(c)) { return false; } c.remove(); for (const auto& [_, o] : _info_to_content) { removeParticipation(c, o); if (o.all_of()) { o.get().others.erase(c); } } } // - clear queues for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { if (group_number == std::get<0>(*it) && peer_number == std::get<1>(*it)) { it = _queue_requested_chunk.erase(it); } else { it++; } } // TODO: nfcft1 should have fired receive/send done events for all them running transfers return false; } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) { std::cerr << "SHA1_NGCFT1: got FT1_HAVE s:" << e.chunks.size() << "\n"; if (e.file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; } SHA1Digest info_hash{e.file_id}; auto itc_it = _info_to_content.find(info_hash); if (itc_it == _info_to_content.end()) { // we are not interested and dont track this return false; } auto o = itc_it->second; if (!static_cast(o)) { std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n"; return false; } const size_t num_total_chunks = o.get().chunks.size(); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); assert(static_cast(c)); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround // we might not know yet if (addParticipation(c, o)) { // something happend, update chunk picker c.emplace_or_replace(); } auto& remote_have = o.get_or_emplace().others; if (!remote_have.contains(c)) { // init remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks}); // new have? nice // (always update on biset, not always on have) c.emplace_or_replace(); } auto& remote_have_peer = remote_have.at(c); if (!remote_have_peer.have_all) { assert(remote_have_peer.have.size_bits() >= num_total_chunks); for (const auto c_i : e.chunks) { if (c_i >= num_total_chunks) { std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n"; std::cerr << info_hash << ": " << c_i << " >= " << num_total_chunks << "\n"; continue; } assert(c_i < num_total_chunks); remote_have_peer.have.set(c_i); } // check for completion? // TODO: optimize bool test_all {true}; for (size_t i = 0; i < remote_have_peer.have.size_bits(); i++) { if (!remote_have_peer.have[i]) { test_all = false; break; } } if (test_all) { // optimize remote_have_peer.have_all = true; remote_have_peer.have = BitSet{}; } } return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) { std::cerr << "SHA1_NGCFT1: got FT1_BITSET o:" << e.start_chunk << " s:" << e.chunk_bitset.size()*8 << "\n"; if (e.file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; } if (e.chunk_bitset.empty()) { // what return false; } SHA1Digest info_hash{e.file_id}; auto itc_it = _info_to_content.find(info_hash); if (itc_it == _info_to_content.end()) { // we are not interested and dont track this return false; } auto o = itc_it->second; if (!static_cast(o)) { std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n"; return false; } const size_t num_total_chunks = o.get().chunks.size(); // +7 for byte rounding if (num_total_chunks+7 < e.start_chunk + (e.chunk_bitset.size()*8)) { std::cerr << "SHA1_NGCFT1 error: got bitset.size+start that is larger then number of chunks!!\n"; std::cerr << "total:" << num_total_chunks << " start:" << e.start_chunk << " size:" << e.chunk_bitset.size()*8 << "\n"; return false; } const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); assert(static_cast(c)); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround // we might not know yet addParticipation(c, o); auto& remote_have = o.get_or_emplace().others; if (!remote_have.contains(c)) { // init remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks}); } auto& remote_have_peer = remote_have.at(c); if (!remote_have_peer.have_all) { // TODO: maybe unset with bitset? BitSet event_bitset{e.chunk_bitset}; remote_have_peer.have.merge(event_bitset, e.start_chunk); // check for completion? // TODO: optimize bool test_all {true}; for (size_t i = 0; i < remote_have_peer.have.size_bits(); i++) { if (!remote_have_peer.have[i]) { test_all = false; break; } } if (test_all) { // optimize remote_have_peer.have_all = true; remote_have_peer.have = BitSet{}; } } // new have? nice // (always update on bitset, not always on have) c.emplace_or_replace(); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) { std::cerr << "SHA1_NGCFT1: got FT1_HAVE_ALL s:" << e.file_id.size() << "\n"; if (e.file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; } SHA1Digest info_hash{e.file_id}; auto itc_it = _info_to_content.find(info_hash); if (itc_it == _info_to_content.end()) { // we are not interested and dont track this return false; } auto o = itc_it->second; if (!static_cast(o)) { std::cerr << "SHA1_NGCFT1 error: tracking info has null object\n"; return false; } const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); assert(static_cast(c)); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround // we might not know yet addParticipation(c, o); auto& remote_have = o.get_or_emplace().others; remote_have[c] = Components::RemoteHave::Entry{true, {}}; // new have? nice // (always update on have_all, not always on have) c.emplace_or_replace(); return true; } bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_pc1_announce& e) { std::cerr << "SHA1_NGCFT1: got PC1_ANNOUNCE s:" << e.id.size() << "\n"; // id is file_kind + id uint32_t file_kind = 0u; static_assert(SHA1Digest{}.size() == 20); if (e.id.size() != sizeof(file_kind) + 20) { // not for us return false; } for (size_t i = 0; i < sizeof(file_kind); i++) { file_kind |= uint32_t(e.id[i]) << (i*8); } if (file_kind != static_cast(NGCFT1_file_kind::HASH_SHA1_INFO)) { return false; } SHA1Digest hash{e.id.data()+sizeof(file_kind), 20}; // if have use hash(-info) for file, add to participants std::cout << "SHA1_NGCFT1: got ParticipationChatter1 announce from " << e.group_number << ":" << e.peer_number << " for " << hash << "\n"; auto itc_it = _info_to_content.find(hash); if (itc_it == _info_to_content.end()) { // we are not interested and dont track this return false; } // add to participants const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround auto o = itc_it->second; if (addParticipation(c, o)) { // something happend, update chunk picker // !!! this is probably too much assert(static_cast(c)); c.emplace_or_replace(); std::cout << "SHA1_NGCFT1: and we where interested!\n"; // we should probably send the bitset back here / add to queue (can be multiple packets) if (o.all_of() && o.get().have_count > 0) { queueBitsetSendFull(c, o); } } return false; }