transfers kinda work now. request algo broken tho
This commit is contained in:
parent
8b554bd80f
commit
e89f1be660
@ -43,7 +43,7 @@ namespace std { // inject
|
|||||||
struct FTInfoSHA1 {
|
struct FTInfoSHA1 {
|
||||||
std::string file_name;
|
std::string file_name;
|
||||||
uint64_t file_size {0};
|
uint64_t file_size {0};
|
||||||
static constexpr size_t chunk_size {4*1024}; // 4KiB for now
|
static constexpr size_t chunk_size {64*1024}; // 64KiB for now
|
||||||
std::vector<SHA1Digest> chunks;
|
std::vector<SHA1Digest> chunks;
|
||||||
|
|
||||||
std::vector<uint8_t> toBuffer(void) const;
|
std::vector<uint8_t> toBuffer(void) const;
|
||||||
|
@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
#include "../tox_client.hpp"
|
#include "../tox_client.hpp"
|
||||||
|
|
||||||
|
#include "../hash_utils.hpp"
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
|
|
||||||
@ -23,16 +25,21 @@ SHA1::SHA1(
|
|||||||
_sha1_info_hash(std::move(sha1_info_hash)),
|
_sha1_info_hash(std::move(sha1_info_hash)),
|
||||||
_have_chunk(std::move(have_chunk))
|
_have_chunk(std::move(have_chunk))
|
||||||
{
|
{
|
||||||
|
assert(_have_chunk.size() == _sha1_info.chunks.size());
|
||||||
|
|
||||||
_have_all = true;
|
_have_all = true;
|
||||||
_have_count = 0;
|
_have_count = 0;
|
||||||
for (const bool it : _have_chunk) {
|
for (size_t i = 0; i < _have_chunk.size(); i++) {
|
||||||
if (!it) {
|
if (_have_chunk[i]) {
|
||||||
_have_all = false;
|
|
||||||
} else {
|
|
||||||
_have_count++;
|
_have_count++;
|
||||||
|
} else {
|
||||||
|
_have_all = false;
|
||||||
|
_chunk_want_queue.push_back(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if not sequential, shuffle _chunk_want_queue
|
||||||
|
|
||||||
// build lookup table
|
// build lookup table
|
||||||
for (size_t i = 0; i < _sha1_info.chunks.size(); i++) {
|
for (size_t i = 0; i < _sha1_info.chunks.size(); i++) {
|
||||||
_chunk_hash_to_index[_sha1_info.chunks[i]] = i;
|
_chunk_hash_to_index[_sha1_info.chunks[i]] = i;
|
||||||
@ -40,49 +47,50 @@ SHA1::SHA1(
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool SHA1::iterate(float delta) {
|
bool SHA1::iterate(float delta) {
|
||||||
// do ongoing transfers, send data?, timeout
|
{ // timer and timeouts
|
||||||
// info
|
// info
|
||||||
for (auto it = _transfers_requested_info.begin(); it != _transfers_requested_info.end();) {
|
for (auto it = _transfers_requested_info.begin(); it != _transfers_requested_info.end();) {
|
||||||
float& time_since_remove_activity = std::get<float>(*it);
|
float& time_since_remove_activity = std::get<float>(*it);
|
||||||
time_since_remove_activity += delta;
|
time_since_remove_activity += delta;
|
||||||
|
|
||||||
// if we have not heard for 10sec, timeout
|
// if we have not heard for 10sec, timeout
|
||||||
if (time_since_remove_activity >= 10.f) {
|
if (time_since_remove_activity >= 10.f) {
|
||||||
std::cerr << "SHA1 info tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << "\n";
|
std::cerr << "SHA1 info tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
|
||||||
it = _transfers_requested_info.erase(it);
|
it = _transfers_requested_info.erase(it);
|
||||||
} else {
|
} else {
|
||||||
it++;
|
it++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
// chunk sending
|
||||||
// chunk sending
|
for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) {
|
||||||
for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) {
|
float& time_since_remove_activity = std::get<float>(*it);
|
||||||
float& time_since_remove_activity = std::get<float>(*it);
|
time_since_remove_activity += delta;
|
||||||
time_since_remove_activity += delta;
|
|
||||||
|
|
||||||
// if we have not heard for 10sec, timeout
|
// if we have not heard for 10sec, timeout
|
||||||
if (time_since_remove_activity >= 10.f) {
|
if (time_since_remove_activity >= 10.f) {
|
||||||
std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << "\n";
|
std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
|
||||||
it = _transfers_sending_chunk.erase(it);
|
it = _transfers_sending_chunk.erase(it);
|
||||||
} else {
|
} else {
|
||||||
it++;
|
it++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
// chunk receiving
|
||||||
// chunk receiving
|
for (auto it = _transfers_receiving_chunk.begin(); it != _transfers_receiving_chunk.end();) {
|
||||||
for (auto it = _transfers_receiving_chunk.begin(); it != _transfers_receiving_chunk.end();) {
|
float& time_since_remove_activity = std::get<float>(*it);
|
||||||
float& time_since_remove_activity = std::get<float>(*it);
|
time_since_remove_activity += delta;
|
||||||
time_since_remove_activity += delta;
|
|
||||||
|
|
||||||
// if we have not heard for 10sec, timeout
|
// if we have not heard for 10sec, timeout
|
||||||
if (time_since_remove_activity >= 10.f) {
|
if (time_since_remove_activity >= 10.f) {
|
||||||
std::cerr << "SHA1 receiving chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << "\n";
|
std::cerr << "SHA1 receiving chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n";
|
||||||
it = _transfers_receiving_chunk.erase(it);
|
it = _transfers_receiving_chunk.erase(it);
|
||||||
} else {
|
} else {
|
||||||
it++;
|
it++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we have not reached the total cap for transfers
|
// if we have not reached the total cap for transfers
|
||||||
if (true) {
|
if (_transfers_requested_info.size() + _transfers_sending_chunk.size() < _max_concurrent_out) {
|
||||||
// for each peer? transfer cap per peer?
|
// for each peer? transfer cap per peer?
|
||||||
|
|
||||||
// first check requests for info
|
// first check requests for info
|
||||||
@ -111,11 +119,7 @@ bool SHA1::iterate(float delta) {
|
|||||||
const auto [group_number, peer_number, chunk_hash] = _queue_requested_chunk.front();
|
const auto [group_number, peer_number, chunk_hash] = _queue_requested_chunk.front();
|
||||||
|
|
||||||
size_t chunk_index = chunkIndex(chunk_hash).value();
|
size_t chunk_index = chunkIndex(chunk_hash).value();
|
||||||
size_t chunk_file_size = _sha1_info.chunk_size;
|
size_t chunk_file_size = chunkSize(chunk_index);
|
||||||
if (chunk_index+1 == _sha1_info.chunks.size()) {
|
|
||||||
// last chunk
|
|
||||||
chunk_file_size = _sha1_info.file_size - chunk_index * _sha1_info.chunk_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint8_t transfer_id {0};
|
uint8_t transfer_id {0};
|
||||||
|
|
||||||
@ -138,6 +142,33 @@ bool SHA1::iterate(float delta) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!_have_all && !_chunk_want_queue.empty() && _transfers_receiving_chunk.size() < _max_concurrent_in) {
|
||||||
|
// send out request, no burst tho
|
||||||
|
std::vector<std::pair<uint32_t, uint32_t>> target_peers;
|
||||||
|
_tcl.forEachGroup([&target_peers, this](uint32_t group_number) {
|
||||||
|
_tcl.forEachGroupPeer(group_number, [&target_peers, group_number](uint32_t peer_number) {
|
||||||
|
target_peers.push_back({group_number, peer_number});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!target_peers.empty()) {
|
||||||
|
//if (_distrib.max() != target_peers.size()) {
|
||||||
|
//std::uniform_int_distribution<size_t> new_dist{0, target_peers.size()-1};
|
||||||
|
//_distrib.param(new_dist.param());
|
||||||
|
//}
|
||||||
|
|
||||||
|
//size_t target_index = _distrib(_rng);
|
||||||
|
size_t target_index = _rng()%target_peers.size();
|
||||||
|
auto [group_number, peer_number] = target_peers.at(target_index);
|
||||||
|
|
||||||
|
size_t chunk_index = _chunk_want_queue.front();
|
||||||
|
_chunks_requested.emplace(chunk_index);
|
||||||
|
_chunk_want_queue.pop_front();
|
||||||
|
|
||||||
|
_tcl.sendFT1RequestPrivate(group_number, peer_number, NGC_FT1_file_kind::HASH_SHA1_CHUNK, _sha1_info.chunks[chunk_index].data.data(), 20);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: unmap and remap the file every couple of minutes to keep ram usage down?
|
// TODO: unmap and remap the file every couple of minutes to keep ram usage down?
|
||||||
// TODO: when to stop?
|
// TODO: when to stop?
|
||||||
return false;
|
return false;
|
||||||
@ -227,10 +258,89 @@ void SHA1::onFT1ReceiveRequestSHA1Chunk(uint32_t group_number, uint32_t peer_num
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool SHA1::onFT1ReceiveInitSHA1Chunk(uint32_t group_number, uint32_t peer_number, const uint8_t* file_id, size_t file_id_size, const uint8_t transfer_id, const size_t file_size) {
|
bool SHA1::onFT1ReceiveInitSHA1Chunk(uint32_t group_number, uint32_t peer_number, const uint8_t* file_id, size_t file_id_size, const uint8_t transfer_id, const size_t file_size) {
|
||||||
return false;
|
if (_transfers_receiving_chunk.size() >= _max_concurrent_in) {
|
||||||
|
// reject, max tf in
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (file_id_size != 20) {
|
||||||
|
std::cerr << "SHA1 got request for sha1_chunk of wrong size!!\n";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SHA1Digest incomming_hash(file_id, file_id_size);
|
||||||
|
|
||||||
|
if (haveChunk(incomming_hash)) {
|
||||||
|
std::cout << "SHA1 ignoring init for chunk we allready have " << incomming_hash << "\n";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto chunk_i_opt = chunkIndex(incomming_hash);
|
||||||
|
|
||||||
|
if (!chunk_i_opt.has_value()) {
|
||||||
|
std::cout << "SHA1 ignoring init for unrelated chunk " << incomming_hash << "\n";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t chunk_index = chunk_i_opt.value();
|
||||||
|
|
||||||
|
// check transfers
|
||||||
|
for (const auto& it : _transfers_receiving_chunk) {
|
||||||
|
if (std::get<4>(it) == chunk_index) {
|
||||||
|
// allready in transition
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_transfers_receiving_chunk.push_back(
|
||||||
|
std::make_tuple(
|
||||||
|
group_number, peer_number,
|
||||||
|
transfer_id,
|
||||||
|
0.f,
|
||||||
|
chunk_index
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SHA1::onFT1ReceiveDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, const uint8_t* data, size_t data_size) {
|
void SHA1::onFT1ReceiveDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, const uint8_t* data, size_t data_size) {
|
||||||
|
// check transfers
|
||||||
|
for (auto it = _transfers_receiving_chunk.begin(); it != _transfers_receiving_chunk.end(); it++) {
|
||||||
|
if (std::get<0>(*it) == group_number && std::get<1>(*it) == peer_number && std::get<2>(*it) == transfer_id) {
|
||||||
|
std::get<float>(*it) = 0.f; // time
|
||||||
|
|
||||||
|
const size_t chunk_index = std::get<4>(*it);
|
||||||
|
|
||||||
|
size_t file_offset = chunk_index * _sha1_info.chunk_size;
|
||||||
|
|
||||||
|
// TODO: optimize
|
||||||
|
for (size_t i = 0; i < data_size; i++) {
|
||||||
|
_file_map[file_offset+data_offset+i] = data[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t chunk_file_size = chunkSize(chunk_index);
|
||||||
|
|
||||||
|
// if last data block
|
||||||
|
if (data_offset + data_size == chunk_file_size) {
|
||||||
|
// hash and verify
|
||||||
|
SHA1Digest test_hash = hash_sha1(_file_map.data()+file_offset, chunk_file_size);
|
||||||
|
if (test_hash != _sha1_info.chunks[chunk_index]) {
|
||||||
|
std::cerr << "SHA1 received chunks's hash does not match!, discarding\n";
|
||||||
|
_transfers_receiving_chunk.erase(it);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
_have_chunk[chunk_index] = true;
|
||||||
|
_have_count++;
|
||||||
|
_have_all = _have_count == _sha1_info.chunks.size();
|
||||||
|
|
||||||
|
std::cout << "SHA1 chunk received " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << " " << chunk_index << " (" << 100.f * float(_have_count) / _sha1_info.chunks.size() << "%)\n";
|
||||||
|
_transfers_receiving_chunk.erase(it);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SHA1::onFT1SendDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, uint8_t* data, size_t data_size) {
|
void SHA1::onFT1SendDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, uint8_t* data, size_t data_size) {
|
||||||
@ -248,15 +358,9 @@ void SHA1::onFT1SendDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, u
|
|||||||
data[i] = _file_map[file_offset+data_offset+i];
|
data[i] = _file_map[file_offset+data_offset+i];
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t chunk_file_size = _sha1_info.chunk_size;
|
|
||||||
if (chunk_index+1 == _sha1_info.chunks.size()) {
|
|
||||||
// last chunk
|
|
||||||
chunk_file_size = _sha1_info.file_size - chunk_index * _sha1_info.chunk_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if last data block
|
// if last data block
|
||||||
if (data_offset + data_size == chunk_file_size) {
|
if (data_offset + data_size == chunkSize(chunk_index)) {
|
||||||
std::cout << "SHA1 chunk sent " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << " " << chunk_index << "\n";
|
std::cout << "SHA1 chunk sent " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << " " << chunk_index << "\n";
|
||||||
_transfers_sending_chunk.erase(it);
|
_transfers_sending_chunk.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -308,6 +412,15 @@ std::optional<size_t> SHA1::chunkIndex(const SHA1Digest& hash) const {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t SHA1::chunkSize(size_t chunk_index) const {
|
||||||
|
if (chunk_index+1 == _sha1_info.chunks.size()) {
|
||||||
|
// last chunk
|
||||||
|
return _sha1_info.file_size - chunk_index * _sha1_info.chunk_size;
|
||||||
|
} else {
|
||||||
|
return _sha1_info.chunk_size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool SHA1::haveChunk(const SHA1Digest& hash) const {
|
bool SHA1::haveChunk(const SHA1Digest& hash) const {
|
||||||
if (_have_all) {
|
if (_have_all) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -7,8 +7,10 @@
|
|||||||
#include <mio/mio.hpp>
|
#include <mio/mio.hpp>
|
||||||
|
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
#include <set>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
#include <random>
|
||||||
|
|
||||||
namespace States {
|
namespace States {
|
||||||
|
|
||||||
@ -50,6 +52,7 @@ struct SHA1 final : public StateI {
|
|||||||
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
|
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
|
||||||
|
|
||||||
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
|
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
|
||||||
|
size_t chunkSize(size_t chunk_index) const;
|
||||||
bool haveChunk(const SHA1Digest& hash) const;
|
bool haveChunk(const SHA1Digest& hash) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -62,6 +65,14 @@ struct SHA1 final : public StateI {
|
|||||||
std::vector<bool> _have_chunk;
|
std::vector<bool> _have_chunk;
|
||||||
bool _have_all {false};
|
bool _have_all {false};
|
||||||
size_t _have_count {0};
|
size_t _have_count {0};
|
||||||
|
std::deque<size_t> _chunk_want_queue;
|
||||||
|
std::set<size_t> _chunks_requested;
|
||||||
|
|
||||||
|
const size_t _max_concurrent_out {4};
|
||||||
|
const size_t _max_concurrent_in {4};
|
||||||
|
|
||||||
|
std::minstd_rand _rng {1337};
|
||||||
|
std::uniform_int_distribution<size_t> _distrib;
|
||||||
|
|
||||||
std::unordered_map<SHA1Digest, size_t> _chunk_hash_to_index;
|
std::unordered_map<SHA1Digest, size_t> _chunk_hash_to_index;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user