diff --git a/src/states/sha1.cpp b/src/states/sha1.cpp index 0e3ec03..12c5dc3 100644 --- a/src/states/sha1.cpp +++ b/src/states/sha1.cpp @@ -4,6 +4,7 @@ #include "../hash_utils.hpp" +#include #include #include #include @@ -32,6 +33,12 @@ SHA1::SHA1( _max_concurrent_in = cl.max_incoming_transfers; _max_concurrent_out = cl.max_incoming_transfers; + // build lookup table + for (size_t i = _sha1_info.chunks.size(); i > 0; i--) { + // chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index + _chunk_hash_to_index[_sha1_info.chunks.at(i-1)] = i-1; + } + _have_all = true; _have_count = 0; for (size_t i = 0; i < _have_chunk.size(); i++) { @@ -39,15 +46,24 @@ SHA1::SHA1( _have_count++; } else { _have_all = false; - _chunk_want_queue.push_back(i); + + // avoid same chunk hash dups + if (_chunk_hash_to_index.at(_sha1_info.chunks.at(i)) == i) { + _chunk_want_queue.push_back(i); + } } } - // if not sequential, shuffle _chunk_want_queue + if (!_have_all) { + assert(_chunk_want_queue.size() > 0); - // build lookup table - for (size_t i = 0; i < _sha1_info.chunks.size(); i++) { - _chunk_hash_to_index[_sha1_info.chunks[i]] = i; + { // load last chunk first :) + size_t tmp = _chunk_want_queue.back(); + _chunk_want_queue.push_front(tmp); + _chunk_want_queue.pop_back(); + } + + // if not sequential, shuffle _chunk_want_queue } } @@ -172,52 +188,75 @@ bool SHA1::iterate(float delta) { } } - if (!_have_all && !_chunk_want_queue.empty() && _chunks_requested.size() + _transfers_receiving_chunk.size() < _max_concurrent_in) { - // send out request, no burst tho - std::vector> target_peers; - std::vector> target_peers_tcp; - _tcl.forEachGroup([&target_peers, &target_peers_tcp, this](uint32_t group_number) { - _tcl.forEachGroupPeer(group_number, [&target_peers, &target_peers_tcp, group_number](uint32_t peer_number, Tox_Connection connection_status) { - if (connection_status == Tox_Connection::TOX_CONNECTION_UDP) { - target_peers.push_back({group_number, peer_number}); - } else { - target_peers_tcp.push_back({group_number, peer_number}); + // update speeds and targets + _peer_speed_mesurement_interval_timer += delta; + if (_peer_speed_mesurement_interval_timer >= _peer_speed_mesurement_interval) { + _peer_speed_mesurement_interval_timer = 0.f; // we lose some time here, but precision is not the issue + + _peer_in_bytes_array_index = (_peer_in_bytes_array_index + 1) % _peer_speed_mesurement_interval_count; + for (const auto& [peer, array] : _peer_in_bytes_array) { + float avg {0.f}; + for (size_t i = 0; i < array.size(); i++) { + avg += array[i]; + } + + // if 6 mesurment every 0.5sec -> avg is over 3sec -> /3 for /s + avg /= _peer_speed_mesurement_interval * _peer_speed_mesurement_interval_count; + + // reset byte count for next round + _peer_in_bytes_array[peer][_peer_in_bytes_array_index] = 0; + + _peer_in_speed[peer] = avg; + } + + _peer_in_targets.clear(); + _tcl.forEachGroup([this](uint32_t group_number) { + _tcl.forEachGroupPeer(group_number, [group_number, this](uint32_t peer_number, Tox_Connection connection_status) { + if (connection_status == Tox_Connection::TOX_CONNECTION_UDP || !_udp_only) { + _peer_in_targets.push_back({group_number, peer_number}); } }); }); - if (!(target_peers.empty() && target_peers_tcp.empty())) { - uint32_t group_number; - uint32_t peer_number; - - if (!target_peers.empty() && !target_peers_tcp.empty()) { // have udp & tcp peers - // 75% chance to roll udp over tcp - if (std::generate_canonical(_rng) >= 0.25f) { - //std::cout << "rolled upd\n"; - size_t target_index = _rng()%target_peers.size(); - std::tie(group_number, peer_number) = target_peers.at(target_index); - } else { // tcp - //std::cout << "rolled tcp\n"; - size_t target_index = _rng()%target_peers_tcp.size(); - std::tie(group_number, peer_number) = target_peers_tcp.at(target_index); + if (!_peer_in_targets.empty()) { + std::vector weights; + for (const auto& peer : _peer_in_targets) { + if (_peer_in_speed.count(peer)) { + weights.push_back( + std::clamp( + (_peer_in_speed.at(peer) / 1024.f) // KiB/s + * (20.f/500.f), // map to a range from 0 to 20, max at 500KiB/s + 1.f, + 20.f + ) + ); + } else { + weights.push_back(1.f); } - } else if (!target_peers.empty()) { // udp - size_t target_index = _rng()%target_peers.size(); - std::tie(group_number, peer_number) = target_peers.at(target_index); - } else { // tcp - size_t target_index = _rng()%target_peers_tcp.size(); - std::tie(group_number, peer_number) = target_peers_tcp.at(target_index); } - size_t chunk_index = _chunk_want_queue.front(); - _chunks_requested[chunk_index] = 0.f; - _chunk_want_queue.pop_front(); - - _tcl.sendFT1RequestPrivate(group_number, peer_number, NGC_FT1_file_kind::HASH_SHA1_CHUNK, _sha1_info.chunks[chunk_index].data.data(), 20); - //std::cout << "sent request " << group_number << ":" << peer_number << "\n"; + std::discrete_distribution tmp_dist{weights.cbegin(), weights.cend()}; + _peer_in_targets_dist.param(tmp_dist.param()); } } + if (!_have_all && !_peer_in_targets.empty() && !_chunk_want_queue.empty() && _chunks_requested.size() + _transfers_receiving_chunk.size() < _max_concurrent_in) { + // send out request, no burst tho + uint32_t group_number; + uint32_t peer_number; + + //size_t target_index = _rng()%target_peers.size(); + size_t target_index = _peer_in_targets_dist(_rng); + std::tie(group_number, peer_number) = _peer_in_targets.at(target_index); + + size_t chunk_index = _chunk_want_queue.front(); + _chunks_requested[chunk_index] = 0.f; + _chunk_want_queue.pop_front(); + + _tcl.sendFT1RequestPrivate(group_number, peer_number, NGC_FT1_file_kind::HASH_SHA1_CHUNK, _sha1_info.chunks[chunk_index].data.data(), 20); + //std::cout << "sent request " << group_number << ":" << peer_number << "\n"; + } + // log _io_log_timer += delta; static const float log_interval {10.f}; @@ -237,6 +276,10 @@ bool SHA1::iterate(float delta) { std::cout << "SHA1 total down: " << _bytes_down / 1024 << "KiB up: " << _bytes_up / 1024 << "KiB\n"; std::cout << "SHA1 cwq:" << _chunk_want_queue.size() << " cwqr:" << _chunks_requested.size() << " trc:" << _transfers_receiving_chunk.size() << " tsc:" << _transfers_sending_chunk.size() << "\n"; + std::cout << "SHA1 peer down speeds:\n"; + for (const auto& [peer, speed] : _peer_in_speed) { + std::cout << " " << peer.first << ":" << peer.second << "(" << _tcl.getGroupPeerName(peer.first, peer.second) << ")" << "\t" << speed / 1024.f << "KiB/s\n"; + } } // TODO: unmap and remap the file every couple of minutes to keep ram usage down? @@ -383,6 +426,8 @@ void SHA1::onFT1ReceiveDataSHA1Chunk(uint32_t group_number, uint32_t peer_number if (std::get<0>(*it) == group_number && std::get<1>(*it) == peer_number && std::get<2>(*it) == transfer_id) { _bytes_down += data_size; + _peer_in_bytes_array[std::make_pair(group_number, peer_number)][_peer_in_bytes_array_index] += data_size; + std::get(*it) = 0.f; // time const size_t chunk_index = std::get<4>(*it); @@ -402,6 +447,7 @@ void SHA1::onFT1ReceiveDataSHA1Chunk(uint32_t group_number, uint32_t peer_number SHA1Digest test_hash = hash_sha1(_file_map.data()+file_offset, chunk_file_size); if (test_hash != _sha1_info.chunks[chunk_index]) { std::cerr << "SHA1 received chunks's hash does not match!, discarding\n"; + _bytes_down -= chunk_file_size; // penalize wrong data _transfers_receiving_chunk.erase(it); _chunk_want_queue.push_front(chunk_index); // put back in queue break; diff --git a/src/states/sha1.hpp b/src/states/sha1.hpp index 4e2f3b8..ae36d9e 100644 --- a/src/states/sha1.hpp +++ b/src/states/sha1.hpp @@ -58,6 +58,8 @@ struct SHA1 final : public StateI { bool haveChunk(const SHA1Digest& hash) const; private: + bool _udp_only {false}; + mio::mmap_sink _file_map; // writable if not all const FTInfoSHA1 _sha1_info; const std::vector _sha1_info_data; @@ -96,6 +98,23 @@ struct SHA1 final : public StateI { // group_number, peer_number, transfer_id(i/o), seconds since (remote) activity, chunk index std::vector> _transfers_sending_chunk; std::vector> _transfers_receiving_chunk; + + static constexpr size_t _peer_speed_mesurement_interval_count {20}; + const float _peer_speed_mesurement_interval {0.5f}; // seconds + float _peer_speed_mesurement_interval_timer {0.f}; // seconds + // bytes received for last 6 intervals for peer + std::map, std::array> _peer_in_bytes_array; + size_t _peer_in_bytes_array_index {0}; // current index into _peer_in_bytes_array. !ringbuffer! + // when chunk data is received, it is added to _peer_in_bytes_array_index in _peer_in_bytes_array + // every _peer_speed_mesurement_interval the avg is calculed and written to _peer_in_speed + // and the _peer_in_bytes_array_index is incremented by 1 + std::map, float> _peer_in_speed; + // speed might be not the actual speed, since wrong data is removed afterwards (on "completion") + // so it can get negative. this makes this more useful for peer selection, less for userfacing stats + // _peer_in_speed feeds directly into _peer_in_targets_dist + std::vector> _peer_in_targets; + std::discrete_distribution _peer_in_targets_dist; + }; } // States diff --git a/src/tox_callbacks.cpp b/src/tox_callbacks.cpp index 59aabb1..6efa7d7 100644 --- a/src/tox_callbacks.cpp +++ b/src/tox_callbacks.cpp @@ -21,6 +21,11 @@ void self_connection_status_cb(Tox*, TOX_CONNECTION connection_status, void *use void friend_request_cb(Tox*, const uint8_t *public_key, const uint8_t *message, size_t length, void *user_data) { static_cast(user_data)->onToxFriendRequest(public_key, std::string_view{reinterpret_cast(message), length}); } + +void group_peer_name_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t* name, size_t length, void *user_data) { + static_cast(user_data)->onToxGroupPeerName(group_number, peer_id, std::string_view{reinterpret_cast(name), length}); +} + void group_custom_packet_cb(Tox*, uint32_t group_number, uint32_t peer_id, const uint8_t *data, size_t length, void *user_data) { static_cast(user_data)->onToxGroupCustomPacket(group_number, peer_id, data, length); } diff --git a/src/tox_callbacks.hpp b/src/tox_callbacks.hpp index 14563e2..9ad2958 100644 --- a/src/tox_callbacks.hpp +++ b/src/tox_callbacks.hpp @@ -19,7 +19,7 @@ void friend_request_cb(Tox *tox, const uint8_t *public_key, const uint8_t *messa //static void friend_message_cb(Tox *tox, uint32_t friend_number, TOX_MESSAGE_TYPE type, const uint8_t *message, size_t length, void *user_data); // ngc -//static void group_peer_name_cb(Tox *tox, uint32_t group_number, uint32_t peer_id, const uint8_t *name, size_t length, void *user_data); +void group_peer_name_cb(Tox *tox, uint32_t group_number, uint32_t peer_id, const uint8_t *name, size_t length, void *user_data); //static void group_peer_status_cb(Tox *tox, uint32_t group_number, uint32_t peer_id, Tox_User_Status status, void *user_data); //static void group_topic_cb(Tox *tox, uint32_t group_number, uint32_t peer_id, const uint8_t *topic, size_t length, void *user_data); //static void group_privacy_state_cb(Tox *tox, uint32_t group_number, Tox_Group_Privacy_State privacy_state, void *user_data); diff --git a/src/tox_client.cpp b/src/tox_client.cpp index e954135..40f50cf 100644 --- a/src/tox_client.cpp +++ b/src/tox_client.cpp @@ -105,6 +105,7 @@ ToxClient::ToxClient(const CommandLine& cl) : //CALLBACK_REG(friend_lossy_packet); //CALLBACK_REG(friend_lossless_packet); + TOX_CALLBACK_REG(group_peer_name); TOX_CALLBACK_REG(group_custom_packet); TOX_CALLBACK_REG(group_custom_private_packet); TOX_CALLBACK_REG(group_invite); @@ -229,6 +230,14 @@ std::string ToxClient::getOwnAddress(void) const { return bin2hex(self_addr); } +std::string_view ToxClient::getGroupPeerName(uint32_t group_number, uint32_t peer_number) const { + if (_groups.count(group_number) && _groups.at(group_number).count(peer_number)) { + return _groups.at(group_number).at(peer_number).name; + } else { + return {""}; + } +} + void ToxClient::onToxSelfConnectionStatus(TOX_CONNECTION connection_status) { std::cout << "TCL self status: "; switch (connection_status) { @@ -255,6 +264,20 @@ void ToxClient::onToxFriendRequest(const uint8_t* public_key, std::string_view m _tox_profile_dirty = true; } +void ToxClient::onToxGroupPeerName(uint32_t group_number, uint32_t peer_id, std::string_view name) { + std::cout << "TCL peer " << group_number << ":" << peer_id << " is now known as " << name << "\n"; + _groups[group_number][peer_id].name = name; +} + +void ToxClient::onToxGroupPeerConnection(uint32_t group_number, uint32_t peer_id, TOX_CONNECTION connection_status) { + std::cout << "TCL peer " << group_number << ":" << peer_id << " status: "; + switch (connection_status) { + case TOX_CONNECTION::TOX_CONNECTION_NONE: std::cout << "offline\n"; break; + case TOX_CONNECTION::TOX_CONNECTION_TCP: std::cout << "TCP-relayed\n"; break; + case TOX_CONNECTION::TOX_CONNECTION_UDP: std::cout << "UDP-direct\n"; break; + } +} + void ToxClient::onToxGroupCustomPacket(uint32_t group_number, uint32_t peer_id, const uint8_t *data, size_t length) { // TODO: signal private? NGC_EXT_handle_group_custom_packet(_tox, _ext_ctx, group_number, peer_id, data, length); @@ -274,8 +297,23 @@ void ToxClient::onToxGroupInvite(uint32_t friend_number, const uint8_t* invite_d void ToxClient::onToxGroupPeerJoin(uint32_t group_number, uint32_t peer_id) { std::cout << "TCL group peer join " << group_number << ":" << peer_id << "\n"; - //_groups[group_number].emplace(peer_id); - _groups[group_number][peer_id] = tox_group_peer_get_connection_status(_tox, group_number, peer_id, nullptr); + + std::vector tmp_name; + { + Tox_Err_Group_Peer_Query err; + size_t length = tox_group_peer_get_name_size(_tox, group_number, peer_id, &err); + if (err == Tox_Err_Group_Peer_Query::TOX_ERR_GROUP_PEER_QUERY_OK) { + tmp_name.resize(length, '\0'); + tox_group_peer_get_name(_tox, group_number, peer_id, tmp_name.data(), nullptr); + } + } + tmp_name.push_back('\0'); // make sure its null terminated + + _groups[group_number][peer_id] = { + tox_group_peer_get_connection_status(_tox, group_number, peer_id, nullptr), + reinterpret_cast(tmp_name.data()) + }; + _tox_profile_dirty = true; } diff --git a/src/tox_client.hpp b/src/tox_client.hpp index dddddd4..1a40d75 100644 --- a/src/tox_client.hpp +++ b/src/tox_client.hpp @@ -33,6 +33,8 @@ struct ToxClient { std::string getOwnAddress(void) const; + std::string_view getGroupPeerName(uint32_t group_number, uint32_t peer_number) const; + template void forEachGroup(FN&& fn) const { for (const auto& it : _groups) { @@ -43,7 +45,8 @@ struct ToxClient { template void forEachGroupPeer(uint32_t group_number, FN&& fn) const { if (_groups.count(group_number)) { - for (const auto [peer_number, connection_status] : _groups.at(group_number)) { + for (const auto& [peer_number, peer] : _groups.at(group_number)) { + const auto& [connection_status, name] = peer; fn(peer_number, connection_status); } } @@ -52,6 +55,8 @@ struct ToxClient { public: // tox callbacks void onToxSelfConnectionStatus(TOX_CONNECTION connection_status); void onToxFriendRequest(const uint8_t* public_key, std::string_view message); + void onToxGroupPeerName(uint32_t group_number, uint32_t peer_id, std::string_view name); + void onToxGroupPeerConnection(uint32_t group_number, uint32_t peer_id, TOX_CONNECTION connection_status); void onToxGroupCustomPacket(uint32_t group_number, uint32_t peer_id, const uint8_t *data, size_t length); void onToxGroupCustomPrivatePacket(uint32_t group_number, uint32_t peer_id, const uint8_t *data, size_t length); void onToxGroupInvite(uint32_t friend_number, const uint8_t* invite_data, size_t invite_length, std::string_view group_name); @@ -87,6 +92,10 @@ struct ToxClient { // key groupid, value set of peer ids //std::map> _groups; - std::map> _groups; + struct Peer { + Tox_Connection connection_status {Tox_Connection::TOX_CONNECTION_NONE}; + std::string name; + }; + std::map> _groups; };