receiving files works. taking into account that chunks can excist multiple times

This commit is contained in:
Green Sky 2023-08-19 00:15:09 +02:00
parent 04875e999f
commit 211cce65de
No known key found for this signature in database
2 changed files with 94 additions and 44 deletions

View File

@ -48,12 +48,17 @@ namespace Components {
std::vector<bool> have_chunk;
bool have_all {false};
size_t have_count {0};
entt::dense_map<SHA1Digest, size_t> chunk_hash_to_index;
entt::dense_map<SHA1Digest, std::vector<size_t>> chunk_hash_to_index;
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
std::vector<size_t> chunkIndices(const SHA1Digest& hash) const;
bool haveChunk(const SHA1Digest& hash) const;
};
struct FT1ChunkSHA1Requested {
// requested chunks with a timer since last request
entt::dense_map<size_t, float> chunks;
};
struct SuspectedParticipants {
entt::dense_set<Contact3> participants;
};
@ -72,12 +77,12 @@ namespace Components {
} // Components
std::optional<size_t> Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const {
std::vector<size_t> Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
const auto it = chunk_hash_to_index.find(hash);
if (it != chunk_hash_to_index.cend()) {
return it->second;
} else {
return std::nullopt;
return {};
}
}
@ -86,8 +91,9 @@ bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
return true;
}
if (auto i_opt = chunkIndex(hash); i_opt.has_value()) {
return have_chunk[i_opt.value()];
if (auto i_vec = chunkIndices(hash); !i_vec.empty()) {
// TODO: should i test all?
return have_chunk[i_vec.front()];
}
// not part of this file
@ -314,6 +320,20 @@ void SHA1_NGCFT1::iterate(float delta) {
_contentr.remove<Components::ReRequestInfoTimer>(e);
}
}
{ // requested chunk timers
_contentr.view<Components::FT1ChunkSHA1Requested>().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) {
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
it->second += delta;
// 20sec, TODO: config
if (it->second >= 20.f) {
it = ftchunk_requested.chunks.erase(it);
} else {
it++;
}
}
});
}
}
// if we have not reached the total cap for transfers
@ -333,8 +353,8 @@ void SHA1_NGCFT1::iterate(float delta) {
if (!_queue_requested_chunk.empty()) { // then check for chunk requests
const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front();
auto chunk_idx_opt = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash);
if (chunk_idx_opt.has_value()) {
auto chunk_idx_vec = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndices(chunk_hash);
if (!chunk_idx_vec.empty()) {
// check if already sending
bool already_sending_to_this_peer = false;
@ -342,7 +362,7 @@ void SHA1_NGCFT1::iterate(float delta) {
for (const auto& [_2, t] : _sending_transfers.at(combineIds(group_number, peer_number))) {
if (std::holds_alternative<SendingTransfer::Chunk>(t.v)) {
const auto& v = std::get<SendingTransfer::Chunk>(t.v);
if (v.content == ce && v.chunk_index == chunk_idx_opt.value()) {
if (v.content == ce && v.chunk_index == chunk_idx_vec.front()) {
// already sending
already_sending_to_this_peer = true;
break;
@ -359,13 +379,13 @@ void SHA1_NGCFT1::iterate(float delta) {
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
chunk_hash.data.data(), chunk_hash.size(),
chunkSize(info, chunk_idx_opt.value()),
chunkSize(info, chunk_idx_vec.front()),
&transfer_id
)) {
_sending_transfers
[combineIds(group_number, peer_number)]
[transfer_id] // TODO: also save index?
.v = SendingTransfer::Chunk{ce, chunk_idx_opt.value()};
.v = SendingTransfer::Chunk{ce, chunk_idx_vec.front()};
}
} // else just remove from queue
}
@ -422,12 +442,32 @@ void SHA1_NGCFT1::iterate(float delta) {
continue;
}
// check by hash
if (cc.haveChunk(info.chunks.at(chunk_idx))) {
// TODO: fix this, a completed chunk should fill all the indecies it occupies
cc.have_chunk[chunk_idx] = true;
cc.have_count += 1;
if (cc.have_count == info.chunks.size()) {
cc.have_all = true;
cc.have_chunk.clear();
break;
}
continue;
}
auto& requested_chunks = ce.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
if (requested_chunks.count(chunk_idx)) {
// already requested
continue;
}
// request chunk_idx
_nft.NGC_FT1_send_request_private(
group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size()
);
requested_chunks[chunk_idx] = 0.f;
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n";
break;
@ -493,10 +533,11 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
cc.have_all = false;
cc.have_count = 0;
cc.chunk_hash_to_index.clear(); // if copy pasta
for (size_t i = 0; i < info.chunks.size(); i++) {
cc.have_chunk.push_back(false);
_chunks[info.chunks[i]] = ce;
cc.chunk_hash_to_index[info.chunks[i]] = i;
cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
}
}
@ -654,18 +695,18 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
// TODO: cache position
// calc offset_into_file
auto idx_opt = cc.chunkIndex(sha1_chunk_hash);
assert(idx_opt.has_value());
auto idx_vec = cc.chunkIndices(sha1_chunk_hash);
assert(!idx_vec.empty());
const auto& info = ce.get<Components::FT1InfoSHA1>();
// TODO: check e.file_size
assert(e.file_size == info.chunkSize(idx_opt.value()));
assert(e.file_size == info.chunkSize(idx_vec.front()));
_receiving_transfers
[combineIds(e.group_number, e.peer_number)]
[e.transfer_id]
.v = ReceivingTransfer::Chunk{ce, idx_opt.value()};
.v = ReceivingTransfer::Chunk{ce, idx_vec};
e.accept = true;
@ -695,15 +736,18 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
}
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
const auto offset_into_file = std::get<ReceivingTransfer::Chunk>(tv).chunk_index * ce.get<Components::FT1InfoSHA1>().chunk_size;
assert(ce.all_of<Message::Components::Transfer::File>());
auto* file = ce.get<Message::Components::Transfer::File>().get();
assert(file != nullptr);
for (const auto chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
const auto offset_into_file = chunk_index* ce.get<Components::FT1InfoSHA1>().chunk_size;
// TODO: avoid temporary copy
// TODO: check return
file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size});
}
} else {
assert(false && "unhandled case");
}
@ -818,7 +862,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
const auto& info = ce.get<Components::FT1InfoSHA1>();
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_index;
// HACK: only check first chunk (they *should* all be the same)
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_indices.front();
const auto offset_into_file = chunk_index * info.chunk_size;
assert(chunk_index < info.chunks.size());
@ -832,9 +878,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
if (info.chunks.at(chunk_index) == got_hash) {
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
// TODO: check for have all
if (!cc.have_all && !cc.have_chunk.at(chunk_index)) {
cc.have_chunk.at(chunk_index) = true;
if (!cc.have_all) {
for (const auto inner_chunk_index : std::get<ReceivingTransfer::Chunk>(tv).chunk_indices) {
if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) {
cc.have_chunk.at(inner_chunk_index) = true;
cc.have_count += 1;
if (cc.have_count == info.chunks.size()) {
// debug check
@ -850,6 +897,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
// good chunk
// TODO: have wasted + metadata
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size();
}
}
} else {
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
}
@ -1075,10 +1124,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce;
for (size_t i = sha1_info.chunks.size(); i > 0; i--) {
_chunks[sha1_info.chunks[i-1]] = ce;
// chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index
cc.chunk_hash_to_index[sha1_info.chunks[i-1]] = i-1;
cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
}
}
@ -1135,9 +1184,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce;
cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]] = i;
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
}
}

View File

@ -37,6 +37,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
// sha1 chunk index
// TODO: optimize lookup
// TODO: multiple contents. hashes might be unique, but data is not
entt::dense_map<SHA1Digest, ContentHandle> _chunks;
// group_number, peer_number, content, chunk_hash, timer
@ -76,8 +77,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
struct Chunk {
ContentHandle content;
size_t chunk_index;
//uint64_t offset_into_file;
std::vector<size_t> chunk_indices;
// or data?
// if memmapped, this would be just a pointer
};