refactor to chunk index and harden sender to not send on already sending transfers
This commit is contained in:
parent
96669e0202
commit
04875e999f
@ -335,21 +335,39 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
|
|
||||||
auto chunk_idx_opt = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash);
|
auto chunk_idx_opt = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash);
|
||||||
if (chunk_idx_opt.has_value()) {
|
if (chunk_idx_opt.has_value()) {
|
||||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
|
||||||
|
|
||||||
uint8_t transfer_id {0};
|
// check if already sending
|
||||||
if (_nft.NGC_FT1_send_init_private(
|
bool already_sending_to_this_peer = false;
|
||||||
group_number, peer_number,
|
if (_sending_transfers.count(combineIds(group_number, peer_number))) {
|
||||||
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
for (const auto& [_2, t] : _sending_transfers.at(combineIds(group_number, peer_number))) {
|
||||||
chunk_hash.data.data(), chunk_hash.size(),
|
if (std::holds_alternative<SendingTransfer::Chunk>(t.v)) {
|
||||||
chunkSize(info, chunk_idx_opt.value()),
|
const auto& v = std::get<SendingTransfer::Chunk>(t.v);
|
||||||
&transfer_id
|
if (v.content == ce && v.chunk_index == chunk_idx_opt.value()) {
|
||||||
)) {
|
// already sending
|
||||||
_sending_transfers
|
already_sending_to_this_peer = true;
|
||||||
[combineIds(group_number, peer_number)]
|
break;
|
||||||
[transfer_id] // TODO: also save index?
|
}
|
||||||
.v = SendingTransfer::Chunk{ce, chunk_idx_opt.value() * info.chunk_size};
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!already_sending_to_this_peer) {
|
||||||
|
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||||
|
|
||||||
|
uint8_t transfer_id {0};
|
||||||
|
if (_nft.NGC_FT1_send_init_private(
|
||||||
|
group_number, peer_number,
|
||||||
|
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
||||||
|
chunk_hash.data.data(), chunk_hash.size(),
|
||||||
|
chunkSize(info, chunk_idx_opt.value()),
|
||||||
|
&transfer_id
|
||||||
|
)) {
|
||||||
|
_sending_transfers
|
||||||
|
[combineIds(group_number, peer_number)]
|
||||||
|
[transfer_id] // TODO: also save index?
|
||||||
|
.v = SendingTransfer::Chunk{ce, chunk_idx_opt.value()};
|
||||||
|
}
|
||||||
|
} // else just remove from queue
|
||||||
}
|
}
|
||||||
// remove from queue regardless
|
// remove from queue regardless
|
||||||
_queue_requested_chunk.pop_front();
|
_queue_requested_chunk.pop_front();
|
||||||
@ -640,14 +658,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
|||||||
assert(idx_opt.has_value());
|
assert(idx_opt.has_value());
|
||||||
|
|
||||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||||
uint64_t offset_into_file = uint64_t(info.chunk_size) * idx_opt.value();
|
|
||||||
|
|
||||||
// TODO: check e.file_size
|
// TODO: check e.file_size
|
||||||
|
assert(e.file_size == info.chunkSize(idx_opt.value()));
|
||||||
|
|
||||||
_receiving_transfers
|
_receiving_transfers
|
||||||
[combineIds(e.group_number, e.peer_number)]
|
[combineIds(e.group_number, e.peer_number)]
|
||||||
[e.transfer_id]
|
[e.transfer_id]
|
||||||
.v = ReceivingTransfer::Chunk{ce, offset_into_file};
|
.v = ReceivingTransfer::Chunk{ce, idx_opt.value()};
|
||||||
|
|
||||||
e.accept = true;
|
e.accept = true;
|
||||||
|
|
||||||
@ -676,8 +694,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
|
|||||||
info_data[i+e.data_offset] = e.data[i];
|
info_data[i+e.data_offset] = e.data[i];
|
||||||
}
|
}
|
||||||
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
||||||
const auto offset_into_file = std::get<ReceivingTransfer::Chunk>(tv).offset_into_file;
|
|
||||||
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
|
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
|
||||||
|
const auto offset_into_file = std::get<ReceivingTransfer::Chunk>(tv).chunk_index * ce.get<Components::FT1InfoSHA1>().chunk_size;
|
||||||
|
|
||||||
assert(ce.all_of<Message::Components::Transfer::File>());
|
assert(ce.all_of<Message::Components::Transfer::File>());
|
||||||
auto* file = ce.get<Message::Components::Transfer::File>().get();
|
auto* file = ce.get<Message::Components::Transfer::File>().get();
|
||||||
@ -717,8 +735,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
|
|||||||
}
|
}
|
||||||
} else if (std::holds_alternative<SendingTransfer::Chunk>(transfer.v)) {
|
} else if (std::holds_alternative<SendingTransfer::Chunk>(transfer.v)) {
|
||||||
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
|
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
|
||||||
|
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
|
||||||
// TODO: should we really use file?
|
// TODO: should we really use file?
|
||||||
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(chunk_transfer.offset_into_file + e.data_offset, e.data_size);
|
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read((chunk_transfer.chunk_index * info.chunk_size) + e.data_offset, e.data_size);
|
||||||
|
|
||||||
// TODO: optimize
|
// TODO: optimize
|
||||||
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
|
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
|
||||||
@ -796,12 +815,12 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
|
|
||||||
updateMessages(ce);
|
updateMessages(ce);
|
||||||
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
|
||||||
const auto offset_into_file = std::get<ReceivingTransfer::Chunk>(tv).offset_into_file;
|
|
||||||
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
|
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
|
||||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
||||||
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
||||||
|
const auto chunk_index = std::get<ReceivingTransfer::Chunk>(tv).chunk_index;
|
||||||
|
const auto offset_into_file = chunk_index * info.chunk_size;
|
||||||
|
|
||||||
const auto chunk_index = offset_into_file/info.chunk_size;
|
|
||||||
assert(chunk_index < info.chunks.size());
|
assert(chunk_index < info.chunks.size());
|
||||||
const auto chunk_size = info.chunkSize(chunk_index);
|
const auto chunk_size = info.chunkSize(chunk_index);
|
||||||
assert(offset_into_file+chunk_size <= info.file_size);
|
assert(offset_into_file+chunk_size <= info.file_size);
|
||||||
|
@ -53,7 +53,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
|
|
||||||
struct Chunk {
|
struct Chunk {
|
||||||
ContentHandle content;
|
ContentHandle content;
|
||||||
uint64_t offset_into_file;
|
size_t chunk_index; // <.< remove offset_into_file
|
||||||
|
//uint64_t offset_into_file;
|
||||||
// or data?
|
// or data?
|
||||||
// if memmapped, this would be just a pointer
|
// if memmapped, this would be just a pointer
|
||||||
};
|
};
|
||||||
@ -75,7 +76,8 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
|
|||||||
|
|
||||||
struct Chunk {
|
struct Chunk {
|
||||||
ContentHandle content;
|
ContentHandle content;
|
||||||
uint64_t offset_into_file;
|
size_t chunk_index;
|
||||||
|
//uint64_t offset_into_file;
|
||||||
// or data?
|
// or data?
|
||||||
// if memmapped, this would be just a pointer
|
// if memmapped, this would be just a pointer
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user