chunk sending (untested)

This commit is contained in:
Green Sky 2023-01-16 01:21:23 +01:00
parent e1d8e9ed4c
commit 8b554bd80f
No known key found for this signature in database
3 changed files with 173 additions and 10 deletions

View File

@ -1,5 +1,6 @@
#pragma once
#include <cstddef>
#include <cstdint>
#include <array>
#include <ostream>
@ -22,6 +23,23 @@ struct SHA1Digest {
std::ostream& operator<<(std::ostream& out, const SHA1Digest& v);
namespace std { // inject
template<> struct hash<SHA1Digest> {
std::size_t operator()(const SHA1Digest& h) const noexcept {
return
size_t(h.data[0]) << (0*8) |
size_t(h.data[1]) << (1*8) |
size_t(h.data[2]) << (2*8) |
size_t(h.data[3]) << (3*8) |
size_t(h.data[4]) << (4*8) |
size_t(h.data[5]) << (5*8) |
size_t(h.data[6]) << (6*8) |
size_t(h.data[7]) << (7*8)
;
}
};
} // std
struct FTInfoSHA1 {
std::string file_name;
uint64_t file_size {0};

View File

@ -3,6 +3,7 @@
#include "../tox_client.hpp"
#include <iostream>
#include <tuple>
namespace States {
@ -23,12 +24,19 @@ SHA1::SHA1(
_have_chunk(std::move(have_chunk))
{
_have_all = true;
_have_count = 0;
for (const bool it : _have_chunk) {
if (!it) {
_have_all = false;
break;
} else {
_have_count++;
}
}
// build lookup table
for (size_t i = 0; i < _sha1_info.chunks.size(); i++) {
_chunk_hash_to_index[_sha1_info.chunks[i]] = i;
}
}
bool SHA1::iterate(float delta) {
@ -46,6 +54,32 @@ bool SHA1::iterate(float delta) {
it++;
}
}
// chunk sending
for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) {
float& time_since_remove_activity = std::get<float>(*it);
time_since_remove_activity += delta;
// if we have not heard for 10sec, timeout
if (time_since_remove_activity >= 10.f) {
std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << "\n";
it = _transfers_sending_chunk.erase(it);
} else {
it++;
}
}
// chunk receiving
for (auto it = _transfers_receiving_chunk.begin(); it != _transfers_receiving_chunk.end();) {
float& time_since_remove_activity = std::get<float>(*it);
time_since_remove_activity += delta;
// if we have not heard for 10sec, timeout
if (time_since_remove_activity >= 10.f) {
std::cerr << "SHA1 receiving chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << "\n";
it = _transfers_receiving_chunk.erase(it);
} else {
it++;
}
}
// if we have not reached the total cap for transfers
if (true) {
@ -73,7 +107,34 @@ bool SHA1::iterate(float delta) {
});
_queue_requested_info.pop_front();
} else if (false) { // then check for chunk requests
} else if (!_queue_requested_chunk.empty()) { // then check for chunk requests
const auto [group_number, peer_number, chunk_hash] = _queue_requested_chunk.front();
size_t chunk_index = chunkIndex(chunk_hash).value();
size_t chunk_file_size = _sha1_info.chunk_size;
if (chunk_index+1 == _sha1_info.chunks.size()) {
// last chunk
chunk_file_size = _sha1_info.file_size - chunk_index * _sha1_info.chunk_size;
}
uint8_t transfer_id {0};
_tcl.sendFT1InitPrivate(
group_number, peer_number,
NGC_FT1_file_kind::HASH_SHA1_CHUNK,
chunk_hash.data.data(), chunk_hash.size(), // id (chunk hash)
chunk_file_size,
transfer_id
);
_transfers_sending_chunk.push_back({
group_number, peer_number,
transfer_id,
0.f,
chunk_index
});
_queue_requested_chunk.pop_front();
}
}
@ -151,14 +212,18 @@ void SHA1::onFT1SendDataSHA1Info(uint32_t group_number, uint32_t peer_number, ui
// sha1_chunk
void SHA1::onFT1ReceiveRequestSHA1Chunk(uint32_t group_number, uint32_t peer_number, const uint8_t* file_id, size_t file_id_size) {
#if 0
bool have {false};
if (_have_all) {
have = _have_all;
} else if (haveChunk(xxx)) {
have = true;
if (file_id_size != 20) {
std::cerr << "SHA1 got request for sha1_chunk of wrong size!!\n";
return;
}
#endif
SHA1Digest requested_hash(file_id, file_id_size);
if (!haveChunk(requested_hash)) {
// we dont have, ignore
return;
}
queueUpRequestChunk(group_number, peer_number, requested_hash);
}
bool SHA1::onFT1ReceiveInitSHA1Chunk(uint32_t group_number, uint32_t peer_number, const uint8_t* file_id, size_t file_id_size, const uint8_t transfer_id, const size_t file_size) {
@ -169,6 +234,35 @@ void SHA1::onFT1ReceiveDataSHA1Chunk(uint32_t group_number, uint32_t peer_number
}
void SHA1::onFT1SendDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, uint8_t* data, size_t data_size) {
// TODO: sub optimal
for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end(); it++) {
if (std::get<0>(*it) == group_number && std::get<1>(*it) == peer_number && std::get<2>(*it) == transfer_id) {
std::get<float>(*it) = 0.f; // time
const size_t chunk_index = std::get<4>(*it);
size_t file_offset = chunk_index * _sha1_info.chunk_size;
// TODO: optimize
for (size_t i = 0; i < data_size; i++) {
data[i] = _file_map[file_offset+data_offset+i];
}
size_t chunk_file_size = _sha1_info.chunk_size;
if (chunk_index+1 == _sha1_info.chunks.size()) {
// last chunk
chunk_file_size = _sha1_info.file_size - chunk_index * _sha1_info.chunk_size;
}
// if last data block
if (data_offset + data_size == chunk_file_size) {
std::cout << "SHA1 chunk sent " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << " " << chunk_index << "\n";
_transfers_sending_chunk.erase(it);
}
break;
}
}
}
void SHA1::queueUpRequestInfo(uint32_t group_number, uint32_t peer_number) {
@ -191,5 +285,41 @@ void SHA1::queueUpRequestInfo(uint32_t group_number, uint32_t peer_number) {
_queue_requested_info.push_back(std::make_pair(group_number, peer_number));
}
void SHA1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash) {
// TODO: transfers
for (auto& [i_g, i_p, i_h] : _queue_requested_chunk) {
// if allready in queue
if (i_g == group_number && i_p == peer_number && i_h == hash) {
return;
}
}
// not in queue yet
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, hash));
}
std::optional<size_t> SHA1::chunkIndex(const SHA1Digest& hash) const {
const auto it = _chunk_hash_to_index.find(hash);
if (it != _chunk_hash_to_index.cend()) {
return it->second;
} else {
return std::nullopt;
}
}
bool SHA1::haveChunk(const SHA1Digest& hash) const {
if (_have_all) {
return true;
}
if (auto i_opt = chunkIndex(hash); i_opt.has_value()) {
return _have_chunk[i_opt.value()];
}
// not part of this file
return false;
}
} // States

View File

@ -6,6 +6,7 @@
#include <mio/mio.hpp>
#include <unordered_map>
#include <vector>
#include <deque>
@ -46,6 +47,10 @@ struct SHA1 final : public StateI {
// avoids duplicates
// clears timer if exists
void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number);
void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash);
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
bool haveChunk(const SHA1Digest& hash) const;
private:
mio::mmap_sink _file_map; // writable if not all
@ -56,11 +61,21 @@ struct SHA1 final : public StateI {
// index is the same as for info
std::vector<bool> _have_chunk;
bool _have_all {false};
size_t _have_count {0};
std::unordered_map<SHA1Digest, size_t> _chunk_hash_to_index;
// group_number, peer_number
std::deque<std::pair<uint32_t, uint32_t>> _queue_requested_info;
// group_number, peer_number, transfer_id, second since (remote) activity
// group_number, peer_number, transfer_id, seconds since (remote) activity
std::vector<std::tuple<uint32_t, uint32_t, uint8_t, float>> _transfers_requested_info;
// group_number, peer_number, chunk_hash
std::deque<std::tuple<uint32_t, uint32_t, SHA1Digest>> _queue_requested_chunk;
// group_number, peer_number, transfer_id(i/o), seconds since (remote) activity, chunk index
std::vector<std::tuple<uint32_t, uint32_t, uint8_t, float, size_t>> _transfers_sending_chunk;
std::vector<std::tuple<uint32_t, uint32_t, uint8_t, float, size_t>> _transfers_receiving_chunk;
};
} // States