Compare commits
2 Commits
3fd6183c21
...
d19fc6ba30
Author | SHA1 | Date | |
---|---|---|---|
d19fc6ba30 | |||
613b183592 |
@ -602,9 +602,9 @@ bool NGCEXTEventProvider::send_ft1_have(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// rest is chunks
|
// rest is chunks
|
||||||
for (size_t i = 0; i < chunks_size; i++) {
|
for (size_t c_i = 0; c_i < chunks_size; c_i++) {
|
||||||
for (size_t i = 0; i < sizeof(chunks_data[i]); i++) {
|
for (size_t i = 0; i < sizeof(chunks_data[c_i]); i++) {
|
||||||
pkg.push_back((chunks_data[i]>>(i*8)) & 0xff);
|
pkg.push_back((chunks_data[c_i]>>(i*8)) & 0xff);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,35 +1,69 @@
|
|||||||
#include "./chunk_picker.hpp"
|
#include "./chunk_picker.hpp"
|
||||||
|
|
||||||
#include <solanaceae/contact/contact_model3.hpp>
|
#include <solanaceae/tox_contacts/components.hpp>
|
||||||
#include <solanaceae/object_store/object_store.hpp>
|
|
||||||
|
|
||||||
#include "./components.hpp"
|
#include "./components.hpp"
|
||||||
|
|
||||||
#include <entt/container/dense_map.hpp>
|
|
||||||
#include <entt/container/dense_set.hpp>
|
|
||||||
|
|
||||||
#include <cstddef>
|
|
||||||
#include <cstdint>
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
|
|
||||||
|
void ChunkPicker::updateParticipation(
|
||||||
|
Contact3Handle c,
|
||||||
|
ObjectRegistry& objreg
|
||||||
|
) {
|
||||||
|
// replaces them in place
|
||||||
|
participating.clear();
|
||||||
|
participating_unfinished.clear();
|
||||||
|
|
||||||
|
for (const Object ov : objreg.view<Components::SuspectedParticipants>()) {
|
||||||
|
const ObjectHandle o {objreg, ov};
|
||||||
|
|
||||||
|
participating.emplace(o);
|
||||||
|
|
||||||
|
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!o.get<Components::FT1ChunkSHA1Cache>().have_all) {
|
||||||
|
participating_unfinished.emplace(o, ParticipationEntry{});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
||||||
Contact3Handle c,
|
Contact3Handle c,
|
||||||
ObjectRegistry& objreg,
|
ObjectRegistry& objreg,
|
||||||
ReceivingTransfers& rt
|
ReceivingTransfers& rt
|
||||||
//NGCFT1& nft
|
//NGCFT1& nft
|
||||||
) {
|
) {
|
||||||
|
if (!static_cast<bool>(c)) {
|
||||||
|
assert(false); return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!c.all_of<Contact::Components::ToxGroupPeerEphemeral>()) {
|
||||||
|
assert(false); return {};
|
||||||
|
}
|
||||||
|
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
|
||||||
|
|
||||||
std::vector<ContentChunkR> req_ret;
|
std::vector<ContentChunkR> req_ret;
|
||||||
|
|
||||||
// count running tf and open requests
|
// count running tf and open requests
|
||||||
// TODO: implement
|
const size_t num_ongoing_transfers = rt.sizePeer(group_number, peer_number);
|
||||||
const size_t num_requests = max_tf_chunk_requests;
|
// TODO: account for open requests
|
||||||
|
// TODO: base max on rate(chunks per sec), gonna be ass with variable chunk size
|
||||||
|
const size_t num_requests = std::max<int64_t>(0, max_tf_chunk_requests-num_ongoing_transfers);
|
||||||
|
|
||||||
// while n < X
|
// while n < X
|
||||||
while (false && !participating_unfinished.empty()) {
|
|
||||||
|
if (participating_unfinished.empty()) {
|
||||||
|
participating_in_last = entt::null;
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
// round robin content (remember last obj)
|
// round robin content (remember last obj)
|
||||||
if (!objreg.valid(participating_in_last)) {
|
if (!objreg.valid(participating_in_last) || !participating_unfinished.count(participating_in_last)) {
|
||||||
participating_in_last = participating_unfinished.begin()->first;
|
participating_in_last = participating_unfinished.begin()->first;
|
||||||
//participating_in_last = *participating_unfinished.begin();
|
//participating_in_last = *participating_unfinished.begin();
|
||||||
}
|
}
|
||||||
@ -84,6 +118,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
|||||||
chunk_candidates.invert();
|
chunk_candidates.invert();
|
||||||
}
|
}
|
||||||
const auto total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
|
const auto total_chunks = o.get<Components::FT1InfoSHA1>().chunks.size();
|
||||||
|
auto& requested_chunks = o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
|
||||||
|
|
||||||
// TODO: trim off round up to 8, since they are now always set
|
// TODO: trim off round up to 8, since they are now always set
|
||||||
|
|
||||||
// now select (globaly) unrequested other have
|
// now select (globaly) unrequested other have
|
||||||
@ -113,12 +149,21 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
|
|||||||
continue; // skip
|
continue; // skip
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: also check against globally running transfers!!!
|
// second check against global requests (this might differ based on strat)
|
||||||
|
if (requested_chunks.count(i) != 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// third we check against globally running transfers (this might differ based on strat)
|
||||||
|
if (rt.containsChunk(o, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// if nothing else blocks this, add to ret
|
// if nothing else blocks this, add to ret
|
||||||
req_ret.push_back(ContentChunkR{o, i});
|
req_ret.push_back(ContentChunkR{o, i});
|
||||||
}
|
|
||||||
|
assert(requested_chunks.count(i) == 0);
|
||||||
|
requested_chunks[i] = 0.f;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +41,11 @@ struct ChunkPicker {
|
|||||||
entt::dense_set<Object> participating;
|
entt::dense_set<Object> participating;
|
||||||
Object participating_in_last {entt::null};
|
Object participating_in_last {entt::null};
|
||||||
|
|
||||||
|
void updateParticipation(
|
||||||
|
Contact3Handle c,
|
||||||
|
ObjectRegistry& objreg
|
||||||
|
);
|
||||||
|
|
||||||
// tick
|
// tick
|
||||||
//void sendInfoRequests();
|
//void sendInfoRequests();
|
||||||
// is this like a system?
|
// is this like a system?
|
||||||
@ -54,7 +59,7 @@ struct ChunkPicker {
|
|||||||
size_t chunk_index;
|
size_t chunk_index;
|
||||||
};
|
};
|
||||||
// returns list of chunks to request
|
// returns list of chunks to request
|
||||||
std::vector<ContentChunkR> updateChunkRequests(
|
[[nodiscard]] std::vector<ContentChunkR> updateChunkRequests(
|
||||||
Contact3Handle c,
|
Contact3Handle c,
|
||||||
ObjectRegistry& objreg,
|
ObjectRegistry& objreg,
|
||||||
ReceivingTransfers& rt
|
ReceivingTransfers& rt
|
||||||
|
@ -45,7 +45,7 @@ static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) {
|
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ObjectHandle content, const SHA1Digest& hash) {
|
||||||
for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) {
|
for (auto& [i_g, i_p, i_o, i_h, i_t] : _queue_requested_chunk) {
|
||||||
// if already in queue
|
// if already in queue
|
||||||
if (i_g == group_number && i_p == peer_number && i_h == hash) {
|
if (i_g == group_number && i_p == peer_number && i_h == hash) {
|
||||||
// update timer
|
// update timer
|
||||||
@ -261,6 +261,7 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
float& timer = std::get<float>(*it);
|
float& timer = std::get<float>(*it);
|
||||||
timer += delta;
|
timer += delta;
|
||||||
|
|
||||||
|
// forget after 10sec
|
||||||
if (timer >= 10.f) {
|
if (timer >= 10.f) {
|
||||||
it = _queue_requested_chunk.erase(it);
|
it = _queue_requested_chunk.erase(it);
|
||||||
} else {
|
} else {
|
||||||
@ -290,8 +291,8 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) {
|
||||||
it->second += delta;
|
it->second += delta;
|
||||||
|
|
||||||
// 20sec, TODO: config
|
// 15sec, TODO: config
|
||||||
if (it->second >= 20.f) {
|
if (it->second >= 15.f) {
|
||||||
it = ftchunk_requested.chunks.erase(it);
|
it = ftchunk_requested.chunks.erase(it);
|
||||||
} else {
|
} else {
|
||||||
it++;
|
it++;
|
||||||
@ -386,6 +387,7 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
|
|
||||||
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
|
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
} else if (!_queue_content_want_chunk.empty()) {
|
} else if (!_queue_content_want_chunk.empty()) {
|
||||||
const auto ce = _queue_content_want_chunk.front();
|
const auto ce = _queue_content_want_chunk.front();
|
||||||
|
|
||||||
@ -450,7 +452,47 @@ void SHA1_NGCFT1::iterate(float delta) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// new chunk picker code
|
||||||
|
_cr.view<ChunkPicker>().each([this](const Contact3 cv, ChunkPicker& cp) {
|
||||||
|
Contact3Handle c{_cr, cv};
|
||||||
|
// HACK: expensive, dont do every tick, only on events
|
||||||
|
// do verification in debug instead?
|
||||||
|
cp.updateParticipation(
|
||||||
|
c,
|
||||||
|
_os.registry()
|
||||||
|
);
|
||||||
|
|
||||||
|
assert(!cp.participating.empty());
|
||||||
|
|
||||||
|
auto new_requests = cp.updateChunkRequests(
|
||||||
|
c,
|
||||||
|
_os.registry(),
|
||||||
|
_receiving_transfers
|
||||||
|
);
|
||||||
|
|
||||||
|
if (new_requests.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(c.all_of<Contact::Components::ToxGroupPeerEphemeral>());
|
||||||
|
const auto [group_number, peer_number] = c.get<Contact::Components::ToxGroupPeerEphemeral>();
|
||||||
|
|
||||||
|
for (const auto [r_o, r_idx] : new_requests) {
|
||||||
|
auto& cc = r_o.get<Components::FT1ChunkSHA1Cache>();
|
||||||
|
const auto& info = r_o.get<Components::FT1InfoSHA1>();
|
||||||
|
|
||||||
|
// request chunk_idx
|
||||||
|
_nft.NGC_FT1_send_request_private(
|
||||||
|
group_number, peer_number,
|
||||||
|
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_CHUNK),
|
||||||
|
info.chunks.at(r_idx).data.data(), info.chunks.at(r_idx).size()
|
||||||
|
);
|
||||||
|
std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(r_idx) << "] from " << group_number << ":" << peer_number << "\n";
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -645,22 +687,22 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto ce = _chunks.at(chunk_hash);
|
auto o = _chunks.at(chunk_hash);
|
||||||
|
|
||||||
{ // they advertise interest in the content
|
{ // they advertise interest in the content
|
||||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||||
addParticipation(c, ce);
|
addParticipation(c, o);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
|
assert(o.all_of<Components::FT1ChunkSHA1Cache>());
|
||||||
|
|
||||||
if (!ce.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
|
if (!o.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
|
||||||
// we dont have the chunk
|
// we dont have the chunk
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// queue good request
|
// queue good request
|
||||||
queueUpRequestChunk(e.group_number, e.peer_number, ce, chunk_hash);
|
queueUpRequestChunk(e.group_number, e.peer_number, o, chunk_hash);
|
||||||
} else {
|
} else {
|
||||||
assert(false && "unhandled case");
|
assert(false && "unhandled case");
|
||||||
}
|
}
|
||||||
@ -711,19 +753,17 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto ce = _chunks.at(sha1_chunk_hash);
|
auto o = _chunks.at(sha1_chunk_hash);
|
||||||
|
|
||||||
// CHECK IF TRANSFER IN PROGESS!!
|
|
||||||
|
|
||||||
{ // they have the content (probably, might be fake, should move this to done)
|
{ // they have the content (probably, might be fake, should move this to done)
|
||||||
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
|
||||||
addParticipation(c, ce);
|
addParticipation(c, o);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(ce.all_of<Components::FT1InfoSHA1>());
|
assert(o.all_of<Components::FT1InfoSHA1>());
|
||||||
assert(ce.all_of<Components::FT1ChunkSHA1Cache>());
|
assert(o.all_of<Components::FT1ChunkSHA1Cache>());
|
||||||
|
|
||||||
const auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
|
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
|
||||||
if (cc.haveChunk(sha1_chunk_hash)) {
|
if (cc.haveChunk(sha1_chunk_hash)) {
|
||||||
std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n";
|
std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n";
|
||||||
// we have the chunk
|
// we have the chunk
|
||||||
@ -735,7 +775,15 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
|||||||
auto idx_vec = cc.chunkIndices(sha1_chunk_hash);
|
auto idx_vec = cc.chunkIndices(sha1_chunk_hash);
|
||||||
assert(!idx_vec.empty());
|
assert(!idx_vec.empty());
|
||||||
|
|
||||||
const auto& info = ce.get<Components::FT1InfoSHA1>();
|
// CHECK IF TRANSFER IN PROGESS!!
|
||||||
|
for (const auto idx : idx_vec) {
|
||||||
|
if (_receiving_transfers.containsPeerChunk(e.group_number, e.peer_number, o, idx)) {
|
||||||
|
std::cerr << "SHA1_NGCFT1 error: " << e.group_number << ":" << e.peer_number << " offered chunk(" << idx << ") it is already receiving!!\n";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto& info = o.get<Components::FT1InfoSHA1>();
|
||||||
|
|
||||||
// TODO: check e.file_size
|
// TODO: check e.file_size
|
||||||
assert(e.file_size == info.chunkSize(idx_vec.front()));
|
assert(e.file_size == info.chunkSize(idx_vec.front()));
|
||||||
@ -743,7 +791,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
|
|||||||
_receiving_transfers.emplaceChunk(
|
_receiving_transfers.emplaceChunk(
|
||||||
e.group_number, e.peer_number,
|
e.group_number, e.peer_number,
|
||||||
e.transfer_id,
|
e.transfer_id,
|
||||||
ReceivingTransfers::Entry::Chunk{ce, idx_vec}
|
ReceivingTransfers::Entry::Chunk{o, idx_vec}
|
||||||
);
|
);
|
||||||
|
|
||||||
e.accept = true;
|
e.accept = true;
|
||||||
@ -1108,6 +1156,9 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
|
|||||||
// HACK: assume the message sender is participating. usually a safe bet.
|
// HACK: assume the message sender is participating. usually a safe bet.
|
||||||
addParticipation(c, ce);
|
addParticipation(c, ce);
|
||||||
|
|
||||||
|
// HACK: assume the message sender has all
|
||||||
|
ce.get_or_emplace<Components::RemoteHave>().others[c] = {true, {}};
|
||||||
|
|
||||||
if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) {
|
if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) {
|
||||||
// TODO: check if already receiving
|
// TODO: check if already receiving
|
||||||
_queue_content_want_info.push_back(ce);
|
_queue_content_want_info.push_back(ce);
|
||||||
@ -1419,6 +1470,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.remove<ChunkPicker>();
|
||||||
|
|
||||||
for (const auto& [_, o] : _info_to_content) {
|
for (const auto& [_, o] : _info_to_content) {
|
||||||
removeParticipation(c, o);
|
removeParticipation(c, o);
|
||||||
|
|
||||||
@ -1484,6 +1537,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
|
|||||||
for (const auto c_i : e.chunks) {
|
for (const auto c_i : e.chunks) {
|
||||||
if (c_i >= num_total_chunks) {
|
if (c_i >= num_total_chunks) {
|
||||||
std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n";
|
std::cerr << "SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!!\n";
|
||||||
|
std::cerr << info_hash << ": " << c_i << " >= " << num_total_chunks << "\n";
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,9 +88,7 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
|||||||
bool _udp_only {false};
|
bool _udp_only {false};
|
||||||
|
|
||||||
size_t _max_concurrent_in {4};
|
size_t _max_concurrent_in {4};
|
||||||
size_t _max_concurrent_out {8};
|
size_t _max_concurrent_out {4};
|
||||||
// TODO: probably also includes running transfers rn (meh)
|
|
||||||
size_t _max_pending_requests {32}; // per content
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
SHA1_NGCFT1(
|
SHA1_NGCFT1(
|
||||||
|
Loading…
Reference in New Issue
Block a user