fix chunk want duplicate hashes, outgoing requests are now prioritized torwards peers we get more data from
This commit is contained in:
parent
294a427eb0
commit
613ae14530
@ -4,6 +4,7 @@
|
||||
|
||||
#include "../hash_utils.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <tuple>
|
||||
#include <random>
|
||||
@ -32,6 +33,12 @@ SHA1::SHA1(
|
||||
_max_concurrent_in = cl.max_incoming_transfers;
|
||||
_max_concurrent_out = cl.max_incoming_transfers;
|
||||
|
||||
// build lookup table
|
||||
for (size_t i = _sha1_info.chunks.size(); i > 0; i--) {
|
||||
// chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index
|
||||
_chunk_hash_to_index[_sha1_info.chunks.at(i-1)] = i-1;
|
||||
}
|
||||
|
||||
_have_all = true;
|
||||
_have_count = 0;
|
||||
for (size_t i = 0; i < _have_chunk.size(); i++) {
|
||||
@ -39,15 +46,24 @@ SHA1::SHA1(
|
||||
_have_count++;
|
||||
} else {
|
||||
_have_all = false;
|
||||
|
||||
// avoid same chunk hash dups
|
||||
if (_chunk_hash_to_index.at(_sha1_info.chunks.at(i)) == i) {
|
||||
_chunk_want_queue.push_back(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!_have_all) {
|
||||
assert(_chunk_want_queue.size() > 0);
|
||||
|
||||
{ // load last chunk first :)
|
||||
size_t tmp = _chunk_want_queue.back();
|
||||
_chunk_want_queue.push_front(tmp);
|
||||
_chunk_want_queue.pop_back();
|
||||
}
|
||||
|
||||
// if not sequential, shuffle _chunk_want_queue
|
||||
|
||||
// build lookup table
|
||||
for (size_t i = 0; i < _sha1_info.chunks.size(); i++) {
|
||||
_chunk_hash_to_index[_sha1_info.chunks[i]] = i;
|
||||
}
|
||||
}
|
||||
|
||||
@ -172,42 +188,66 @@ bool SHA1::iterate(float delta) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!_have_all && !_chunk_want_queue.empty() && _chunks_requested.size() + _transfers_receiving_chunk.size() < _max_concurrent_in) {
|
||||
// send out request, no burst tho
|
||||
std::vector<std::pair<uint32_t, uint32_t>> target_peers;
|
||||
std::vector<std::pair<uint32_t, uint32_t>> target_peers_tcp;
|
||||
_tcl.forEachGroup([&target_peers, &target_peers_tcp, this](uint32_t group_number) {
|
||||
_tcl.forEachGroupPeer(group_number, [&target_peers, &target_peers_tcp, group_number](uint32_t peer_number, Tox_Connection connection_status) {
|
||||
if (connection_status == Tox_Connection::TOX_CONNECTION_UDP) {
|
||||
target_peers.push_back({group_number, peer_number});
|
||||
} else {
|
||||
target_peers_tcp.push_back({group_number, peer_number});
|
||||
// update speeds and targets
|
||||
_peer_speed_mesurement_interval_timer += delta;
|
||||
if (_peer_speed_mesurement_interval_timer >= _peer_speed_mesurement_interval) {
|
||||
_peer_speed_mesurement_interval_timer = 0.f; // we lose some time here, but precision is not the issue
|
||||
|
||||
_peer_in_bytes_array_index = (_peer_in_bytes_array_index + 1) % _peer_speed_mesurement_interval_count;
|
||||
for (const auto& [peer, array] : _peer_in_bytes_array) {
|
||||
float avg {0.f};
|
||||
for (size_t i = 0; i < array.size(); i++) {
|
||||
avg += array[i];
|
||||
}
|
||||
|
||||
// if 6 mesurment every 0.5sec -> avg is over 3sec -> /3 for /s
|
||||
avg /= _peer_speed_mesurement_interval * _peer_speed_mesurement_interval_count;
|
||||
|
||||
// reset byte count for next round
|
||||
_peer_in_bytes_array[peer][_peer_in_bytes_array_index] = 0;
|
||||
|
||||
_peer_in_speed[peer] = avg;
|
||||
}
|
||||
|
||||
_peer_in_targets.clear();
|
||||
_tcl.forEachGroup([this](uint32_t group_number) {
|
||||
_tcl.forEachGroupPeer(group_number, [group_number, this](uint32_t peer_number, Tox_Connection connection_status) {
|
||||
if (connection_status == Tox_Connection::TOX_CONNECTION_UDP || !_udp_only) {
|
||||
_peer_in_targets.push_back({group_number, peer_number});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
if (!(target_peers.empty() && target_peers_tcp.empty())) {
|
||||
if (!_peer_in_targets.empty()) {
|
||||
std::vector<double> weights;
|
||||
for (const auto& peer : _peer_in_targets) {
|
||||
if (_peer_in_speed.count(peer)) {
|
||||
weights.push_back(
|
||||
std::clamp(
|
||||
(_peer_in_speed.at(peer) / 1024.f) // KiB/s
|
||||
* (20.f/500.f), // map to a range from 0 to 20, max at 500KiB/s
|
||||
1.f,
|
||||
20.f
|
||||
)
|
||||
);
|
||||
} else {
|
||||
weights.push_back(1.f);
|
||||
}
|
||||
}
|
||||
|
||||
std::discrete_distribution<size_t> tmp_dist{weights.cbegin(), weights.cend()};
|
||||
_peer_in_targets_dist.param(tmp_dist.param());
|
||||
}
|
||||
}
|
||||
|
||||
if (!_have_all && !_peer_in_targets.empty() && !_chunk_want_queue.empty() && _chunks_requested.size() + _transfers_receiving_chunk.size() < _max_concurrent_in) {
|
||||
// send out request, no burst tho
|
||||
uint32_t group_number;
|
||||
uint32_t peer_number;
|
||||
|
||||
if (!target_peers.empty() && !target_peers_tcp.empty()) { // have udp & tcp peers
|
||||
// 75% chance to roll udp over tcp
|
||||
if (std::generate_canonical<float, 10>(_rng) >= 0.25f) {
|
||||
//std::cout << "rolled upd\n";
|
||||
size_t target_index = _rng()%target_peers.size();
|
||||
std::tie(group_number, peer_number) = target_peers.at(target_index);
|
||||
} else { // tcp
|
||||
//std::cout << "rolled tcp\n";
|
||||
size_t target_index = _rng()%target_peers_tcp.size();
|
||||
std::tie(group_number, peer_number) = target_peers_tcp.at(target_index);
|
||||
}
|
||||
} else if (!target_peers.empty()) { // udp
|
||||
size_t target_index = _rng()%target_peers.size();
|
||||
std::tie(group_number, peer_number) = target_peers.at(target_index);
|
||||
} else { // tcp
|
||||
size_t target_index = _rng()%target_peers_tcp.size();
|
||||
std::tie(group_number, peer_number) = target_peers_tcp.at(target_index);
|
||||
}
|
||||
//size_t target_index = _rng()%target_peers.size();
|
||||
size_t target_index = _peer_in_targets_dist(_rng);
|
||||
std::tie(group_number, peer_number) = _peer_in_targets.at(target_index);
|
||||
|
||||
size_t chunk_index = _chunk_want_queue.front();
|
||||
_chunks_requested[chunk_index] = 0.f;
|
||||
@ -216,7 +256,6 @@ bool SHA1::iterate(float delta) {
|
||||
_tcl.sendFT1RequestPrivate(group_number, peer_number, NGC_FT1_file_kind::HASH_SHA1_CHUNK, _sha1_info.chunks[chunk_index].data.data(), 20);
|
||||
//std::cout << "sent request " << group_number << ":" << peer_number << "\n";
|
||||
}
|
||||
}
|
||||
|
||||
// log
|
||||
_io_log_timer += delta;
|
||||
@ -237,6 +276,10 @@ bool SHA1::iterate(float delta) {
|
||||
std::cout << "SHA1 total down: " << _bytes_down / 1024 << "KiB up: " << _bytes_up / 1024 << "KiB\n";
|
||||
|
||||
std::cout << "SHA1 cwq:" << _chunk_want_queue.size() << " cwqr:" << _chunks_requested.size() << " trc:" << _transfers_receiving_chunk.size() << " tsc:" << _transfers_sending_chunk.size() << "\n";
|
||||
std::cout << "SHA1 peer down speeds:\n";
|
||||
for (const auto& [peer, speed] : _peer_in_speed) {
|
||||
std::cout << " " << peer.first << ":" << peer.second << "(" << _tcl.getGroupPeerName(peer.first, peer.second) << ")" << "\t" << speed / 1024.f << "KiB/s\n";
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: unmap and remap the file every couple of minutes to keep ram usage down?
|
||||
@ -383,6 +426,8 @@ void SHA1::onFT1ReceiveDataSHA1Chunk(uint32_t group_number, uint32_t peer_number
|
||||
if (std::get<0>(*it) == group_number && std::get<1>(*it) == peer_number && std::get<2>(*it) == transfer_id) {
|
||||
_bytes_down += data_size;
|
||||
|
||||
_peer_in_bytes_array[std::make_pair(group_number, peer_number)][_peer_in_bytes_array_index] += data_size;
|
||||
|
||||
std::get<float>(*it) = 0.f; // time
|
||||
|
||||
const size_t chunk_index = std::get<4>(*it);
|
||||
@ -402,6 +447,7 @@ void SHA1::onFT1ReceiveDataSHA1Chunk(uint32_t group_number, uint32_t peer_number
|
||||
SHA1Digest test_hash = hash_sha1(_file_map.data()+file_offset, chunk_file_size);
|
||||
if (test_hash != _sha1_info.chunks[chunk_index]) {
|
||||
std::cerr << "SHA1 received chunks's hash does not match!, discarding\n";
|
||||
_bytes_down -= chunk_file_size; // penalize wrong data
|
||||
_transfers_receiving_chunk.erase(it);
|
||||
_chunk_want_queue.push_front(chunk_index); // put back in queue
|
||||
break;
|
||||
|
@ -58,6 +58,8 @@ struct SHA1 final : public StateI {
|
||||
bool haveChunk(const SHA1Digest& hash) const;
|
||||
|
||||
private:
|
||||
bool _udp_only {false};
|
||||
|
||||
mio::mmap_sink _file_map; // writable if not all
|
||||
const FTInfoSHA1 _sha1_info;
|
||||
const std::vector<uint8_t> _sha1_info_data;
|
||||
@ -96,6 +98,23 @@ struct SHA1 final : public StateI {
|
||||
// group_number, peer_number, transfer_id(i/o), seconds since (remote) activity, chunk index
|
||||
std::vector<std::tuple<uint32_t, uint32_t, uint8_t, float, size_t>> _transfers_sending_chunk;
|
||||
std::vector<std::tuple<uint32_t, uint32_t, uint8_t, float, size_t>> _transfers_receiving_chunk;
|
||||
|
||||
static constexpr size_t _peer_speed_mesurement_interval_count {20};
|
||||
const float _peer_speed_mesurement_interval {0.5f}; // seconds
|
||||
float _peer_speed_mesurement_interval_timer {0.f}; // seconds
|
||||
// bytes received for last 6 intervals for peer
|
||||
std::map<std::pair<uint32_t, uint32_t>, std::array<int64_t, _peer_speed_mesurement_interval_count>> _peer_in_bytes_array;
|
||||
size_t _peer_in_bytes_array_index {0}; // current index into _peer_in_bytes_array. !ringbuffer!
|
||||
// when chunk data is received, it is added to _peer_in_bytes_array_index in _peer_in_bytes_array
|
||||
// every _peer_speed_mesurement_interval the avg is calculed and written to _peer_in_speed
|
||||
// and the _peer_in_bytes_array_index is incremented by 1
|
||||
std::map<std::pair<uint32_t, uint32_t>, float> _peer_in_speed;
|
||||
// speed might be not the actual speed, since wrong data is removed afterwards (on "completion")
|
||||
// so it can get negative. this makes this more useful for peer selection, less for userfacing stats
|
||||
// _peer_in_speed feeds directly into _peer_in_targets_dist
|
||||
std::vector<std::pair<uint32_t, uint32_t>> _peer_in_targets;
|
||||
std::discrete_distribution<size_t> _peer_in_targets_dist;
|
||||
|
||||
};
|
||||
|
||||
} // States
|
||||
|
@ -21,6 +21,11 @@ void self_connection_status_cb(Tox*, TOX_CONNECTION connection_status, void *use
|
||||
void friend_request_cb(Tox*, const uint8_t *public_key, const uint8_t *message, size_t length, void *user_data) {
|
||||
static_cast<ToxClient*>(user_data)->onToxFriendRequest(public_key, std::string_view{reinterpret_cast<const char*>(message), length});
|
||||
}
|
||||
|
||||
void group_peer_name_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* name, size_t length, void *user_data) {
|
||||
static_cast<ToxClient*>(user_data)->onToxGroupPeerName(group_number, peer_id, std::string_view{reinterpret_cast<const char*>(name), length});
|
||||
}
|
||||
|
||||
void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t *data, size_t length, void *user_data) {
|
||||
static_cast<ToxClient*>(user_data)->onToxGroupCustomPacket(group_number, peer_id, data, length);
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ void friend_request_cb(Tox *tox, const uint8_t *public_key, const uint8_t *messa
|
||||
//static void friend_message_cb(Tox *tox, uint32_t friend_number, TOX_MESSAGE_TYPE type, const uint8_t *message, size_t length, void *user_data);
|
||||
|
||||
// ngc
|
||||
//static void group_peer_name_cb(Tox *tox, uint32_t group_number, uint32_t peer_id, const uint8_t *name, size_t length, void *user_data);
|
||||
void group_peer_name_cb(Tox *tox, uint32_t group_number, uint32_t peer_id, const uint8_t *name, size_t length, void *user_data);
|
||||
//static void group_peer_status_cb(Tox *tox, uint32_t group_number, uint32_t peer_id, Tox_User_Status status, void *user_data);
|
||||
//static void group_topic_cb(Tox *tox, uint32_t group_number, uint32_t peer_id, const uint8_t *topic, size_t length, void *user_data);
|
||||
//static void group_privacy_state_cb(Tox *tox, uint32_t group_number, Tox_Group_Privacy_State privacy_state, void *user_data);
|
||||
|
@ -105,6 +105,7 @@ ToxClient::ToxClient(const CommandLine& cl) :
|
||||
//CALLBACK_REG(friend_lossy_packet);
|
||||
//CALLBACK_REG(friend_lossless_packet);
|
||||
|
||||
TOX_CALLBACK_REG(group_peer_name);
|
||||
TOX_CALLBACK_REG(group_custom_packet);
|
||||
TOX_CALLBACK_REG(group_custom_private_packet);
|
||||
TOX_CALLBACK_REG(group_invite);
|
||||
@ -229,6 +230,14 @@ std::string ToxClient::getOwnAddress(void) const {
|
||||
return bin2hex(self_addr);
|
||||
}
|
||||
|
||||
std::string_view ToxClient::getGroupPeerName(uint32_t group_number, uint32_t peer_number) const {
|
||||
if (_groups.count(group_number) && _groups.at(group_number).count(peer_number)) {
|
||||
return _groups.at(group_number).at(peer_number).name;
|
||||
} else {
|
||||
return {""};
|
||||
}
|
||||
}
|
||||
|
||||
void ToxClient::onToxSelfConnectionStatus(TOX_CONNECTION connection_status) {
|
||||
std::cout << "TCL self status: ";
|
||||
switch (connection_status) {
|
||||
@ -255,6 +264,20 @@ void ToxClient::onToxFriendRequest(const uint8_t* public_key, std::string_view m
|
||||
_tox_profile_dirty = true;
|
||||
}
|
||||
|
||||
void ToxClient::onToxGroupPeerName(uint32_t group_number, uint32_t peer_id, std::string_view name) {
|
||||
std::cout << "TCL peer " << group_number << ":" << peer_id << " is now known as " << name << "\n";
|
||||
_groups[group_number][peer_id].name = name;
|
||||
}
|
||||
|
||||
void ToxClient::onToxGroupPeerConnection(uint32_t group_number, uint32_t peer_id, TOX_CONNECTION connection_status) {
|
||||
std::cout << "TCL peer " << group_number << ":" << peer_id << " status: ";
|
||||
switch (connection_status) {
|
||||
case TOX_CONNECTION::TOX_CONNECTION_NONE: std::cout << "offline\n"; break;
|
||||
case TOX_CONNECTION::TOX_CONNECTION_TCP: std::cout << "TCP-relayed\n"; break;
|
||||
case TOX_CONNECTION::TOX_CONNECTION_UDP: std::cout << "UDP-direct\n"; break;
|
||||
}
|
||||
}
|
||||
|
||||
void ToxClient::onToxGroupCustomPacket(uint32_t group_number, uint32_t peer_id, const uint8_t *data, size_t length) {
|
||||
// TODO: signal private?
|
||||
NGC_EXT_handle_group_custom_packet(_tox, _ext_ctx, group_number, peer_id, data, length);
|
||||
@ -274,8 +297,23 @@ void ToxClient::onToxGroupInvite(uint32_t friend_number, const uint8_t* invite_d
|
||||
|
||||
void ToxClient::onToxGroupPeerJoin(uint32_t group_number, uint32_t peer_id) {
|
||||
std::cout << "TCL group peer join " << group_number << ":" << peer_id << "\n";
|
||||
//_groups[group_number].emplace(peer_id);
|
||||
_groups[group_number][peer_id] = tox_group_peer_get_connection_status(_tox, group_number, peer_id, nullptr);
|
||||
|
||||
std::vector<uint8_t> tmp_name;
|
||||
{
|
||||
Tox_Err_Group_Peer_Query err;
|
||||
size_t length = tox_group_peer_get_name_size(_tox, group_number, peer_id, &err);
|
||||
if (err == Tox_Err_Group_Peer_Query::TOX_ERR_GROUP_PEER_QUERY_OK) {
|
||||
tmp_name.resize(length, '\0');
|
||||
tox_group_peer_get_name(_tox, group_number, peer_id, tmp_name.data(), nullptr);
|
||||
}
|
||||
}
|
||||
tmp_name.push_back('\0'); // make sure its null terminated
|
||||
|
||||
_groups[group_number][peer_id] = {
|
||||
tox_group_peer_get_connection_status(_tox, group_number, peer_id, nullptr),
|
||||
reinterpret_cast<const char*>(tmp_name.data())
|
||||
};
|
||||
|
||||
_tox_profile_dirty = true;
|
||||
}
|
||||
|
||||
|
@ -33,6 +33,8 @@ struct ToxClient {
|
||||
|
||||
std::string getOwnAddress(void) const;
|
||||
|
||||
std::string_view getGroupPeerName(uint32_t group_number, uint32_t peer_number) const;
|
||||
|
||||
template<typename FN>
|
||||
void forEachGroup(FN&& fn) const {
|
||||
for (const auto& it : _groups) {
|
||||
@ -43,7 +45,8 @@ struct ToxClient {
|
||||
template<typename FN>
|
||||
void forEachGroupPeer(uint32_t group_number, FN&& fn) const {
|
||||
if (_groups.count(group_number)) {
|
||||
for (const auto [peer_number, connection_status] : _groups.at(group_number)) {
|
||||
for (const auto& [peer_number, peer] : _groups.at(group_number)) {
|
||||
const auto& [connection_status, name] = peer;
|
||||
fn(peer_number, connection_status);
|
||||
}
|
||||
}
|
||||
@ -52,6 +55,8 @@ struct ToxClient {
|
||||
public: // tox callbacks
|
||||
void onToxSelfConnectionStatus(TOX_CONNECTION connection_status);
|
||||
void onToxFriendRequest(const uint8_t* public_key, std::string_view message);
|
||||
void onToxGroupPeerName(uint32_t group_number, uint32_t peer_id, std::string_view name);
|
||||
void onToxGroupPeerConnection(uint32_t group_number, uint32_t peer_id, TOX_CONNECTION connection_status);
|
||||
void onToxGroupCustomPacket(uint32_t group_number, uint32_t peer_id, const uint8_t *data, size_t length);
|
||||
void onToxGroupCustomPrivatePacket(uint32_t group_number, uint32_t peer_id, const uint8_t *data, size_t length);
|
||||
void onToxGroupInvite(uint32_t friend_number, const uint8_t* invite_data, size_t invite_length, std::string_view group_name);
|
||||
@ -87,6 +92,10 @@ struct ToxClient {
|
||||
|
||||
// key groupid, value set of peer ids
|
||||
//std::map<uint32_t, std::set<uint32_t>> _groups;
|
||||
std::map<uint32_t, std::map<uint32_t, Tox_Connection>> _groups;
|
||||
struct Peer {
|
||||
Tox_Connection connection_status {Tox_Connection::TOX_CONNECTION_NONE};
|
||||
std::string name;
|
||||
};
|
||||
std::map<uint32_t, std::map<uint32_t, Peer>> _groups;
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user