From 211cce65de7f02f8a2db2c7e926b86a05f327ac1 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 19 Aug 2023 00:15:09 +0200 Subject: [PATCH] receiving files works. taking into account that chunks can excist multiple times --- src/sha1_ngcft1.cpp | 134 ++++++++++++++++++++++++++++++-------------- src/sha1_ngcft1.hpp | 4 +- 2 files changed, 94 insertions(+), 44 deletions(-) diff --git a/src/sha1_ngcft1.cpp b/src/sha1_ngcft1.cpp index 94ee72a..caa755c 100644 --- a/src/sha1_ngcft1.cpp +++ b/src/sha1_ngcft1.cpp @@ -48,12 +48,17 @@ namespace Components { std::vector have_chunk; bool have_all {false}; size_t have_count {0}; - entt::dense_map chunk_hash_to_index; + entt::dense_map> chunk_hash_to_index; - std::optional chunkIndex(const SHA1Digest& hash) const; + std::vector chunkIndices(const SHA1Digest& hash) const; bool haveChunk(const SHA1Digest& hash) const; }; + struct FT1ChunkSHA1Requested { + // requested chunks with a timer since last request + entt::dense_map chunks; + }; + struct SuspectedParticipants { entt::dense_set participants; }; @@ -72,12 +77,12 @@ namespace Components { } // Components -std::optional Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const { +std::vector Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const { const auto it = chunk_hash_to_index.find(hash); if (it != chunk_hash_to_index.cend()) { return it->second; } else { - return std::nullopt; + return {}; } } @@ -86,8 +91,9 @@ bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { return true; } - if (auto i_opt = chunkIndex(hash); i_opt.has_value()) { - return have_chunk[i_opt.value()]; + if (auto i_vec = chunkIndices(hash); !i_vec.empty()) { + // TODO: should i test all? + return have_chunk[i_vec.front()]; } // not part of this file @@ -314,6 +320,20 @@ void SHA1_NGCFT1::iterate(float delta) { _contentr.remove(e); } } + { // requested chunk timers + _contentr.view().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) { + for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { + it->second += delta; + + // 20sec, TODO: config + if (it->second >= 20.f) { + it = ftchunk_requested.chunks.erase(it); + } else { + it++; + } + } + }); + } } // if we have not reached the total cap for transfers @@ -333,8 +353,8 @@ void SHA1_NGCFT1::iterate(float delta) { if (!_queue_requested_chunk.empty()) { // then check for chunk requests const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front(); - auto chunk_idx_opt = ce.get().chunkIndex(chunk_hash); - if (chunk_idx_opt.has_value()) { + auto chunk_idx_vec = ce.get().chunkIndices(chunk_hash); + if (!chunk_idx_vec.empty()) { // check if already sending bool already_sending_to_this_peer = false; @@ -342,7 +362,7 @@ void SHA1_NGCFT1::iterate(float delta) { for (const auto& [_2, t] : _sending_transfers.at(combineIds(group_number, peer_number))) { if (std::holds_alternative(t.v)) { const auto& v = std::get(t.v); - if (v.content == ce && v.chunk_index == chunk_idx_opt.value()) { + if (v.content == ce && v.chunk_index == chunk_idx_vec.front()) { // already sending already_sending_to_this_peer = true; break; @@ -359,13 +379,13 @@ void SHA1_NGCFT1::iterate(float delta) { group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), chunk_hash.data.data(), chunk_hash.size(), - chunkSize(info, chunk_idx_opt.value()), + chunkSize(info, chunk_idx_vec.front()), &transfer_id )) { _sending_transfers [combineIds(group_number, peer_number)] [transfer_id] // TODO: also save index? - .v = SendingTransfer::Chunk{ce, chunk_idx_opt.value()}; + .v = SendingTransfer::Chunk{ce, chunk_idx_vec.front()}; } } // else just remove from queue } @@ -422,12 +442,32 @@ void SHA1_NGCFT1::iterate(float delta) { continue; } + // check by hash + if (cc.haveChunk(info.chunks.at(chunk_idx))) { + // TODO: fix this, a completed chunk should fill all the indecies it occupies + cc.have_chunk[chunk_idx] = true; + cc.have_count += 1; + if (cc.have_count == info.chunks.size()) { + cc.have_all = true; + cc.have_chunk.clear(); + break; + } + continue; + } + + auto& requested_chunks = ce.get_or_emplace().chunks; + if (requested_chunks.count(chunk_idx)) { + // already requested + continue; + } + // request chunk_idx _nft.NGC_FT1_send_request_private( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size() ); + requested_chunks[chunk_idx] = 0.f; std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n"; break; @@ -493,10 +533,11 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { cc.have_all = false; cc.have_count = 0; + cc.chunk_hash_to_index.clear(); // if copy pasta for (size_t i = 0; i < info.chunks.size(); i++) { cc.have_chunk.push_back(false); _chunks[info.chunks[i]] = ce; - cc.chunk_hash_to_index[info.chunks[i]] = i; + cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } } @@ -654,18 +695,18 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { // TODO: cache position // calc offset_into_file - auto idx_opt = cc.chunkIndex(sha1_chunk_hash); - assert(idx_opt.has_value()); + auto idx_vec = cc.chunkIndices(sha1_chunk_hash); + assert(!idx_vec.empty()); const auto& info = ce.get(); // TODO: check e.file_size - assert(e.file_size == info.chunkSize(idx_opt.value())); + assert(e.file_size == info.chunkSize(idx_vec.front())); _receiving_transfers [combineIds(e.group_number, e.peer_number)] [e.transfer_id] - .v = ReceivingTransfer::Chunk{ce, idx_opt.value()}; + .v = ReceivingTransfer::Chunk{ce, idx_vec}; e.accept = true; @@ -695,15 +736,18 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { } } else if (std::holds_alternative(tv)) { auto ce = std::get(tv).content; - const auto offset_into_file = std::get(tv).chunk_index * ce.get().chunk_size; assert(ce.all_of()); auto* file = ce.get().get(); assert(file != nullptr); - // TODO: avoid temporary copy - // TODO: check return - file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size}); + for (const auto chunk_index : std::get(tv).chunk_indices) { + const auto offset_into_file = chunk_index* ce.get().chunk_size; + + // TODO: avoid temporary copy + // TODO: check return + file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size}); + } } else { assert(false && "unhandled case"); } @@ -818,7 +862,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { auto ce = std::get(tv).content; const auto& info = ce.get(); auto& cc = ce.get(); - const auto chunk_index = std::get(tv).chunk_index; + + // HACK: only check first chunk (they *should* all be the same) + const auto chunk_index = std::get(tv).chunk_indices.front(); const auto offset_into_file = chunk_index * info.chunk_size; assert(chunk_index < info.chunks.size()); @@ -832,24 +878,27 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { if (info.chunks.at(chunk_index) == got_hash) { std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; - // TODO: check for have all - if (!cc.have_all && !cc.have_chunk.at(chunk_index)) { - cc.have_chunk.at(chunk_index) = true; - cc.have_count += 1; - if (cc.have_count == info.chunks.size()) { - // debug check - for (const bool it : cc.have_chunk) { - assert(it); + if (!cc.have_all) { + for (const auto inner_chunk_index : std::get(tv).chunk_indices) { + if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) { + cc.have_chunk.at(inner_chunk_index) = true; + cc.have_count += 1; + if (cc.have_count == info.chunks.size()) { + // debug check + for (const bool it : cc.have_chunk) { + assert(it); + } + + cc.have_all = true; + cc.have_chunk.clear(); // not wasting memory + std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; + } + + // good chunk + // TODO: have wasted + metadata + ce.get_or_emplace().total += chunk_data.size(); } - - cc.have_all = true; - cc.have_chunk.clear(); // not wasting memory - std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; } - - // good chunk - // TODO: have wasted + metadata - ce.get_or_emplace().total += chunk_data.size(); } else { std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; } @@ -1075,10 +1124,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std cc.have_count = sha1_info.chunks.size(); // need? _info_to_content[sha1_info_hash] = ce; - for (size_t i = sha1_info.chunks.size(); i > 0; i--) { - _chunks[sha1_info.chunks[i-1]] = ce; - // chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index - cc.chunk_hash_to_index[sha1_info.chunks[i-1]] = i-1; + cc.chunk_hash_to_index.clear(); // for cpy pst + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + _chunks[sha1_info.chunks[i]] = ce; + cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); } } @@ -1135,9 +1184,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std cc.have_count = sha1_info.chunks.size(); // need? _info_to_content[sha1_info_hash] = ce; + cc.chunk_hash_to_index.clear(); // for cpy pst for (size_t i = 0; i < sha1_info.chunks.size(); i++) { _chunks[sha1_info.chunks[i]] = ce; - cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; + cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); } } diff --git a/src/sha1_ngcft1.hpp b/src/sha1_ngcft1.hpp index 74eb555..165e9ad 100644 --- a/src/sha1_ngcft1.hpp +++ b/src/sha1_ngcft1.hpp @@ -37,6 +37,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { // sha1 chunk index // TODO: optimize lookup + // TODO: multiple contents. hashes might be unique, but data is not entt::dense_map _chunks; // group_number, peer_number, content, chunk_hash, timer @@ -76,8 +77,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { struct Chunk { ContentHandle content; - size_t chunk_index; - //uint64_t offset_into_file; + std::vector chunk_indices; // or data? // if memmapped, this would be just a pointer };