2023-08-19 22:37:55 +02:00
# include "./sha1_ngcft1.hpp"
2024-01-13 22:34:42 +01:00
# include <solanaceae/util/utils.hpp>
2023-08-19 22:37:55 +02:00
# include <solanaceae/contact/components.hpp>
# include <solanaceae/tox_contacts/components.hpp>
# include <solanaceae/message3/components.hpp>
# include <solanaceae/tox_messages/components.hpp>
2024-07-02 16:09:59 +02:00
# include "./util.hpp"
2023-08-19 22:37:55 +02:00
# include "./ft1_sha1_info.hpp"
# include "./hash_utils.hpp"
# include <sodium.h>
# include <entt/container/dense_set.hpp>
2023-08-21 21:32:50 +02:00
# include "./file_rw_mapped.hpp"
2024-06-24 16:42:23 +02:00
# include "./components.hpp"
2024-07-08 18:12:47 +02:00
# include "./contact_components.hpp"
2024-06-28 15:13:17 +02:00
# include "./chunk_picker.hpp"
2024-06-28 21:10:01 +02:00
# include "./participation.hpp"
2024-06-24 16:42:23 +02:00
2023-08-19 22:37:55 +02:00
# include <iostream>
# include <variant>
# include <filesystem>
2023-08-21 21:23:13 +02:00
# include <mutex>
2023-08-21 00:01:14 +02:00
# include <future>
2024-06-23 10:17:48 +02:00
# include <vector>
2023-08-19 22:37:55 +02:00
namespace Message : : Components {
2024-04-29 11:55:11 +02:00
using Content = ObjectHandle ;
2023-08-19 22:37:55 +02:00
} // Message::Components
static size_t chunkSize ( const FT1InfoSHA1 & sha1_info , size_t chunk_index ) {
if ( chunk_index + 1 = = sha1_info . chunks . size ( ) ) {
// last chunk
return sha1_info . file_size - chunk_index * sha1_info . chunk_size ;
} else {
return sha1_info . chunk_size ;
}
}
2024-04-29 11:55:11 +02:00
void SHA1_NGCFT1 : : queueUpRequestChunk ( uint32_t group_number , uint32_t peer_number , ObjectHandle content , const SHA1Digest & hash ) {
2024-07-03 12:11:20 +02:00
for ( auto & [ i_g , i_p , i_o , i_h , i_t ] : _queue_requested_chunk ) {
2023-08-19 22:37:55 +02:00
// if already in queue
if ( i_g = = group_number & & i_p = = peer_number & & i_h = = hash ) {
// update timer
i_t = 0.f ;
return ;
}
}
2023-10-15 22:02:34 +02:00
// check for running transfer
2024-07-02 16:09:59 +02:00
if ( _sending_transfers . count ( combine_ids ( group_number , peer_number ) ) ) {
for ( const auto & [ _ , transfer ] : _sending_transfers . at ( combine_ids ( group_number , peer_number ) ) ) {
2023-10-15 22:02:34 +02:00
if ( std : : holds_alternative < SendingTransfer : : Info > ( transfer . v ) ) {
// ignore info
continue ;
}
const auto & t_c = std : : get < SendingTransfer : : Chunk > ( transfer . v ) ;
if ( content ! = t_c . content ) {
// ignore different content
continue ;
}
auto chunk_idx_vec = content . get < Components : : FT1ChunkSHA1Cache > ( ) . chunkIndices ( hash ) ;
for ( size_t idx : chunk_idx_vec ) {
if ( idx = = t_c . chunk_index ) {
// already sending
return ; // skip
}
}
}
}
2023-08-19 22:37:55 +02:00
// not in queue yet
_queue_requested_chunk . push_back ( std : : make_tuple ( group_number , peer_number , content , hash , 0.f ) ) ;
}
2024-04-29 11:55:11 +02:00
void SHA1_NGCFT1 : : updateMessages ( ObjectHandle ce ) {
2023-08-19 22:37:55 +02:00
assert ( ce . all_of < Components : : Messages > ( ) ) ;
for ( auto msg : ce . get < Components : : Messages > ( ) . messages ) {
if ( ce . all_of < Message : : Components : : Transfer : : FileInfo > ( ) & & ! msg . all_of < Message : : Components : : Transfer : : FileInfo > ( ) ) {
msg . emplace < Message : : Components : : Transfer : : FileInfo > ( ce . get < Message : : Components : : Transfer : : FileInfo > ( ) ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : FileInfoLocal > ( ) ) {
msg . emplace_or_replace < Message : : Components : : Transfer : : FileInfoLocal > ( ce . get < Message : : Components : : Transfer : : FileInfoLocal > ( ) ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : BytesSent > ( ) ) {
msg . emplace_or_replace < Message : : Components : : Transfer : : BytesSent > ( ce . get < Message : : Components : : Transfer : : BytesSent > ( ) ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : BytesReceived > ( ) ) {
msg . emplace_or_replace < Message : : Components : : Transfer : : BytesReceived > ( ce . get < Message : : Components : : Transfer : : BytesReceived > ( ) ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : TagPaused > ( ) ) {
msg . emplace_or_replace < Message : : Components : : Transfer : : TagPaused > ( ) ;
} else {
msg . remove < Message : : Components : : Transfer : : TagPaused > ( ) ;
}
if ( auto * cc = ce . try_get < Components : : FT1ChunkSHA1Cache > ( ) ; cc ! = nullptr & & cc - > have_all ) {
msg . emplace_or_replace < Message : : Components : : Transfer : : TagHaveAll > ( ) ;
}
_rmm . throwEventUpdate ( msg ) ;
}
}
2024-04-29 11:55:11 +02:00
std : : optional < std : : pair < uint32_t , uint32_t > > SHA1_NGCFT1 : : selectPeerForRequest ( ObjectHandle ce ) {
2023-08-19 22:37:55 +02:00
// get a list of peers we can request this file from
std : : vector < std : : pair < uint32_t , uint32_t > > tox_peers ;
for ( const auto c : ce . get < Components : : SuspectedParticipants > ( ) . participants ) {
// TODO: sort by con state?
// prio to direct?
if ( const auto * cs = _cr . try_get < Contact : : Components : : ConnectionState > ( c ) ; cs = = nullptr | | cs - > state = = Contact : : Components : : ConnectionState : : State : : disconnected ) {
continue ;
}
if ( _cr . all_of < Contact : : Components : : ToxGroupPeerEphemeral > ( c ) ) {
const auto & tgpe = _cr . get < Contact : : Components : : ToxGroupPeerEphemeral > ( c ) ;
tox_peers . push_back ( { tgpe . group_number , tgpe . peer_number } ) ;
}
}
2024-06-22 17:01:52 +02:00
// 1 in 40 chance to ask random peer instead
2023-08-19 22:37:55 +02:00
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
2024-06-22 14:08:12 +02:00
// TODO: remove once pc1_announce is shipped
2024-06-22 17:01:52 +02:00
if ( tox_peers . empty ( ) | | ( _rng ( ) % 40 ) = = 0 ) {
2023-08-19 22:37:55 +02:00
// meh
// HACK: determain group based on last tox_peers
if ( ! tox_peers . empty ( ) ) {
const uint32_t group_number = tox_peers . back ( ) . first ;
auto gch = _tcm . getContactGroup ( group_number ) ;
assert ( static_cast < bool > ( gch ) ) ;
std : : vector < uint32_t > un_tox_peers ;
for ( const auto child : gch . get < Contact : : Components : : ParentOf > ( ) . subs ) {
if ( const auto * cs = _cr . try_get < Contact : : Components : : ConnectionState > ( child ) ; cs = = nullptr | | cs - > state = = Contact : : Components : : ConnectionState : : State : : disconnected ) {
continue ;
}
2024-04-15 11:48:17 +02:00
if ( _cr . all_of < Contact : : Components : : TagSelfStrong > ( child ) ) {
continue ; // skip self
}
2023-08-19 22:37:55 +02:00
if ( _cr . all_of < Contact : : Components : : ToxGroupPeerEphemeral > ( child ) ) {
const auto & tgpe = _cr . get < Contact : : Components : : ToxGroupPeerEphemeral > ( child ) ;
un_tox_peers . push_back ( tgpe . peer_number ) ;
}
}
if ( un_tox_peers . empty ( ) ) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng ( ) % un_tox_peers . size ( ) ;
const auto peer_number = un_tox_peers . at ( sample_i ) ;
return std : : make_pair ( group_number , peer_number ) ;
}
}
} else {
const size_t sample_i = _rng ( ) % tox_peers . size ( ) ;
const auto [ group_number , peer_number ] = tox_peers . at ( sample_i ) ;
return std : : make_pair ( group_number , peer_number ) ;
}
return std : : nullopt ;
}
2024-07-10 12:27:19 +02:00
void SHA1_NGCFT1 : : queueBitsetSendFull ( Contact3Handle c , ObjectHandle o ) {
if ( ! static_cast < bool > ( c ) | | ! static_cast < bool > ( o ) ) {
assert ( false ) ;
return ;
}
// TODO: only queue if not already sent??
if ( ! o . all_of < Components : : FT1ChunkSHA1Cache , Components : : FT1InfoSHA1 > ( ) ) {
return ;
}
const auto & cc = o . get < Components : : FT1ChunkSHA1Cache > ( ) ;
const auto & info = o . get < Components : : FT1InfoSHA1 > ( ) ;
const auto total_chunks = info . chunks . size ( ) ;
static constexpr size_t bits_per_packet { 8u * 512u } ;
for ( size_t i = 0 ; i < total_chunks ; i + = bits_per_packet ) {
size_t bits_this_packet = std : : min < size_t > ( bits_per_packet , total_chunks - i ) ;
BitSet have ( bits_this_packet ) ; // default init to zero
if ( cc . have_all ) {
// TODO: send have all packet instead
have . invert ( ) ; // we "have_all"
} else {
// TODO: optimize selective copy bitset
for ( size_t j = i ; j < i + bits_this_packet ; j + + ) {
if ( cc . have_chunk [ j ] ) {
have . set ( j - i ) ;
}
}
}
_queue_send_bitset . push_back ( QBitsetEntry { c , o , i , have } ) ;
}
}
2023-08-19 22:37:55 +02:00
SHA1_NGCFT1 : : SHA1_NGCFT1 (
2024-04-29 11:55:11 +02:00
ObjectStore2 & os ,
2023-08-19 22:37:55 +02:00
Contact3Registry & cr ,
RegistryMessageModel & rmm ,
NGCFT1 & nft ,
2024-06-22 14:08:12 +02:00
ToxContactModel2 & tcm ,
2024-06-22 17:01:52 +02:00
ToxEventProviderI & tep ,
2024-06-23 10:17:48 +02:00
NGCEXTEventProvider & neep
2023-08-19 22:37:55 +02:00
) :
2024-04-29 11:55:11 +02:00
_os ( os ) ,
2023-08-19 22:37:55 +02:00
_cr ( cr ) ,
_rmm ( rmm ) ,
_nft ( nft ) ,
2024-06-22 14:08:12 +02:00
_tcm ( tcm ) ,
2024-06-22 17:01:52 +02:00
_tep ( tep ) ,
_neep ( neep )
2023-08-19 22:37:55 +02:00
{
// TODO: also create and destroy
_rmm . subscribe ( this , RegistryMessageModel_Event : : message_updated ) ;
_nft . subscribe ( this , NGCFT1_Event : : recv_request ) ;
_nft . subscribe ( this , NGCFT1_Event : : recv_init ) ;
_nft . subscribe ( this , NGCFT1_Event : : recv_data ) ;
_nft . subscribe ( this , NGCFT1_Event : : send_data ) ;
_nft . subscribe ( this , NGCFT1_Event : : recv_done ) ;
_nft . subscribe ( this , NGCFT1_Event : : send_done ) ;
_nft . subscribe ( this , NGCFT1_Event : : recv_message ) ;
//_rmm.subscribe(this, RegistryMessageModel_Event::message_construct);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
_rmm . subscribe ( this , RegistryMessageModel_Event : : send_file_path ) ;
2024-06-22 14:08:12 +02:00
_tep . subscribe ( this , Tox_Event_Type : : TOX_EVENT_GROUP_PEER_EXIT ) ;
2024-06-22 17:01:52 +02:00
_neep . subscribe ( this , NGCEXT_Event : : PC1_ANNOUNCE ) ;
2024-06-23 15:12:31 +02:00
_neep . subscribe ( this , NGCEXT_Event : : FT1_HAVE ) ;
_neep . subscribe ( this , NGCEXT_Event : : FT1_BITSET ) ;
2023-08-19 22:37:55 +02:00
}
2024-07-07 13:21:59 +02:00
float SHA1_NGCFT1 : : iterate ( float delta ) {
2024-07-08 18:12:47 +02:00
//std::cerr << "---------- new tick ----------\n";
2023-08-21 00:01:14 +02:00
// info builder queue
if ( _info_builder_dirty ) {
std : : lock_guard l { _info_builder_queue_mutex } ;
_info_builder_dirty = false ; // set while holding lock
for ( auto & it : _info_builder_queue ) {
//it.fn();
it ( ) ;
}
_info_builder_queue . clear ( ) ;
}
2024-07-07 12:44:17 +02:00
entt : : dense_map < Contact3 , size_t > _peer_open_requests ;
2023-08-19 22:37:55 +02:00
{ // timers
// sending transfers
for ( auto peer_it = _sending_transfers . begin ( ) ; peer_it ! = _sending_transfers . end ( ) ; ) {
for ( auto it = peer_it - > second . begin ( ) ; it ! = peer_it - > second . end ( ) ; ) {
it - > second . time_since_activity + = delta ;
2024-03-09 18:06:49 +01:00
// if we have not heard for 2min, timeout (lower level event on real timeout)
// TODO: do we really need this if we get events?
if ( it - > second . time_since_activity > = 120.f ) {
2023-08-19 22:37:55 +02:00
std : : cerr < < " SHA1_NGCFT1 warning: sending tansfer timed out " < < " . " < < int ( it - > first ) < < " \n " ;
2024-07-09 11:00:59 +02:00
assert ( false ) ;
2023-08-19 22:37:55 +02:00
it = peer_it - > second . erase ( it ) ;
} else {
it + + ;
}
}
if ( peer_it - > second . empty ( ) ) {
// cleanup unused peers too agressive?
peer_it = _sending_transfers . erase ( peer_it ) ;
} else {
peer_it + + ;
}
}
// receiving transfers
2024-06-30 14:03:06 +02:00
_receiving_transfers . tick ( delta ) ;
2023-08-19 22:37:55 +02:00
// queued requests
for ( auto it = _queue_requested_chunk . begin ( ) ; it ! = _queue_requested_chunk . end ( ) ; ) {
float & timer = std : : get < float > ( * it ) ;
timer + = delta ;
2024-07-03 12:11:20 +02:00
// forget after 10sec
2023-08-19 22:37:55 +02:00
if ( timer > = 10.f ) {
it = _queue_requested_chunk . erase ( it ) ;
} else {
it + + ;
}
}
{ // requested info timers
2024-04-29 11:55:11 +02:00
std : : vector < Object > timed_out ;
_os . registry ( ) . view < Components : : ReRequestInfoTimer > ( ) . each ( [ delta , & timed_out ] ( Object e , Components : : ReRequestInfoTimer & rrit ) {
2023-08-19 22:37:55 +02:00
rrit . timer + = delta ;
// 15sec, TODO: config
if ( rrit . timer > = 15.f ) {
timed_out . push_back ( e ) ;
}
} ) ;
for ( const auto e : timed_out ) {
// TODO: avoid dups
2024-04-29 11:55:11 +02:00
_queue_content_want_info . push_back ( _os . objectHandle ( e ) ) ;
_os . registry ( ) . remove < Components : : ReRequestInfoTimer > ( e ) ;
// TODO: throw update?
2023-08-19 22:37:55 +02:00
}
}
{ // requested chunk timers
2024-07-07 12:44:17 +02:00
_os . registry ( ) . view < Components : : FT1ChunkSHA1Requested > ( ) . each ( [ delta , & _peer_open_requests ] ( Components : : FT1ChunkSHA1Requested & ftchunk_requested ) {
2023-08-19 22:37:55 +02:00
for ( auto it = ftchunk_requested . chunks . begin ( ) ; it ! = ftchunk_requested . chunks . end ( ) ; ) {
2024-07-07 12:44:17 +02:00
it - > second . timer + = delta ;
2023-08-19 22:37:55 +02:00
2024-07-09 11:00:59 +02:00
// TODO: config
if ( it - > second . timer > = 60.f ) {
2023-08-19 22:37:55 +02:00
it = ftchunk_requested . chunks . erase ( it ) ;
} else {
2024-07-07 12:44:17 +02:00
_peer_open_requests [ it - > second . c ] + = 1 ;
2023-08-19 22:37:55 +02:00
it + + ;
}
}
} ) ;
}
}
2024-07-10 12:27:19 +02:00
{ // send out bitsets
// currently 1 per tick
if ( ! _queue_send_bitset . empty ( ) ) {
const auto & qe = _queue_send_bitset . front ( ) ;
// TODO: build bitset inplace instead, to not miss any chunks arrived this tick
if ( qe . c . all_of < Contact : : Components : : ToxGroupPeerEphemeral > ( ) & & qe . o . all_of < Components : : FT1InfoSHA1Hash > ( ) ) {
const auto [ group_number , peer_number ] = qe . c . get < Contact : : Components : : ToxGroupPeerEphemeral > ( ) ;
const auto & info_hash = qe . o . get < Components : : FT1InfoSHA1Hash > ( ) . hash ;
// TODO: only pop if sent?
_neep . send_ft1_bitset (
group_number , peer_number ,
static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_INFO ) ,
info_hash . data ( ) , info_hash . size ( ) ,
qe . start_index ,
qe . have . _bytes . data ( ) , qe . have . size_bytes ( )
) ;
}
_queue_send_bitset . pop_front ( ) ;
}
}
2023-08-19 22:37:55 +02:00
// if we have not reached the total cap for transfers
// count running transfers
size_t running_sending_transfer_count { 0 } ;
for ( const auto & [ _ , transfers ] : _sending_transfers ) {
running_sending_transfer_count + = transfers . size ( ) ;
}
2024-06-30 14:03:06 +02:00
size_t running_receiving_transfer_count { _receiving_transfers . size ( ) } ;
2023-08-19 22:37:55 +02:00
if ( running_sending_transfer_count < _max_concurrent_out ) {
// TODO: for each peer? transfer cap per peer?
// TODO: info queue
if ( ! _queue_requested_chunk . empty ( ) ) { // then check for chunk requests
const auto [ group_number , peer_number , ce , chunk_hash , _ ] = _queue_requested_chunk . front ( ) ;
auto chunk_idx_vec = ce . get < Components : : FT1ChunkSHA1Cache > ( ) . chunkIndices ( chunk_hash ) ;
if ( ! chunk_idx_vec . empty ( ) ) {
// check if already sending
bool already_sending_to_this_peer = false ;
2024-07-02 16:09:59 +02:00
if ( _sending_transfers . count ( combine_ids ( group_number , peer_number ) ) ) {
for ( const auto & [ _2 , t ] : _sending_transfers . at ( combine_ids ( group_number , peer_number ) ) ) {
2023-08-19 22:37:55 +02:00
if ( std : : holds_alternative < SendingTransfer : : Chunk > ( t . v ) ) {
const auto & v = std : : get < SendingTransfer : : Chunk > ( t . v ) ;
if ( v . content = = ce & & v . chunk_index = = chunk_idx_vec . front ( ) ) {
// already sending
already_sending_to_this_peer = true ;
break ;
}
}
}
}
if ( ! already_sending_to_this_peer ) {
const auto & info = ce . get < Components : : FT1InfoSHA1 > ( ) ;
uint8_t transfer_id { 0 } ;
if ( _nft . NGC_FT1_send_init_private (
group_number , peer_number ,
static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_CHUNK ) ,
chunk_hash . data . data ( ) , chunk_hash . size ( ) ,
chunkSize ( info , chunk_idx_vec . front ( ) ) ,
& transfer_id
) ) {
_sending_transfers
2024-07-02 16:09:59 +02:00
[ combine_ids ( group_number , peer_number ) ]
2023-08-19 22:37:55 +02:00
[ transfer_id ] // TODO: also save index?
. v = SendingTransfer : : Chunk { ce , chunk_idx_vec . front ( ) } ;
}
} // else just remove from queue
}
// remove from queue regardless
_queue_requested_chunk . pop_front ( ) ;
}
}
if ( running_receiving_transfer_count < _max_concurrent_in ) {
// strictly priorize info
if ( ! _queue_content_want_info . empty ( ) ) {
const auto ce = _queue_content_want_info . front ( ) ;
// make sure we are missing the info
assert ( ! ce . all_of < Components : : ReRequestInfoTimer > ( ) ) ;
assert ( ! ce . all_of < Components : : FT1InfoSHA1 > ( ) ) ;
assert ( ! ce . all_of < Components : : FT1InfoSHA1Data > ( ) ) ;
assert ( ! ce . all_of < Components : : FT1ChunkSHA1Cache > ( ) ) ;
assert ( ce . all_of < Components : : FT1InfoSHA1Hash > ( ) ) ;
auto selected_peer_opt = selectPeerForRequest ( ce ) ;
if ( selected_peer_opt . has_value ( ) ) {
const auto [ group_number , peer_number ] = selected_peer_opt . value ( ) ;
//const auto& info = msg.get<Components::FT1InfoSHA1>();
const auto & info_hash = ce . get < Components : : FT1InfoSHA1Hash > ( ) . hash ;
_nft . NGC_FT1_send_request_private (
group_number , peer_number ,
static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_INFO ) ,
info_hash . data ( ) , info_hash . size ( )
) ;
ce . emplace < Components : : ReRequestInfoTimer > ( 0.f ) ;
_queue_content_want_info . pop_front ( ) ;
std : : cout < < " SHA1_NGCFT1: sent info request for [ " < < SHA1Digest { info_hash } < < " ] to " < < group_number < < " : " < < peer_number < < " \n " ;
}
}
2024-07-08 18:12:47 +02:00
}
2024-07-03 12:11:20 +02:00
2024-07-08 18:12:47 +02:00
// ran regardless of _max_concurrent_in
{ // new chunk picker code
2024-07-07 14:15:26 +02:00
std : : vector < Contact3Handle > cp_to_remove ;
2024-07-08 18:12:47 +02:00
// first, update timers
_cr . view < ChunkPickerTimer > ( ) . each ( [ this , delta ] ( const Contact3 cv , ChunkPickerTimer & cpt ) {
cpt . timer - = delta ;
if ( cpt . timer < = 0.f ) {
_cr . emplace_or_replace < ChunkPickerUpdateTag > ( cv ) ;
}
} ) ;
//std::cout << "number of chunkpickers: " << _cr.storage<ChunkPicker>().size() << ", of which " << _cr.storage<ChunkPickerUpdateTag>().size() << " need updating\n";
// now check for potentially missing cp
auto cput_view = _cr . view < ChunkPickerUpdateTag > ( ) ;
2024-07-09 11:04:19 +02:00
cput_view . each ( [ this , & cp_to_remove ] ( const Contact3 cv ) {
2024-07-08 18:12:47 +02:00
Contact3Handle c { _cr , cv } ;
//std::cout << "cput :)\n";
if ( ! c . any_of < Contact : : Components : : ToxGroupPeerEphemeral , Contact : : Components : : FT1Participation > ( ) ) {
std : : cout < < " cput uh nuh :( \n " ;
2024-07-09 11:04:19 +02:00
cp_to_remove . push_back ( c ) ;
2024-07-08 18:12:47 +02:00
return ;
}
if ( ! c . all_of < ChunkPicker > ( ) ) {
std : : cout < < " creating new cp!! \n " ;
c . emplace < ChunkPicker > ( ) ;
c . emplace_or_replace < ChunkPickerTimer > ( ) ;
}
} ) ;
// now update all cp that are tagged
2024-07-09 11:04:19 +02:00
_cr . view < ChunkPicker , ChunkPickerUpdateTag > ( ) . each ( [ this , & _peer_open_requests , & cp_to_remove ] ( const Contact3 cv , ChunkPicker & cp ) {
2024-07-03 12:11:20 +02:00
Contact3Handle c { _cr , cv } ;
2024-07-07 14:15:26 +02:00
2024-07-08 18:46:26 +02:00
if ( ! c . all_of < Contact : : Components : : ToxGroupPeerEphemeral , Contact : : Components : : FT1Participation > ( ) ) {
2024-07-09 11:04:19 +02:00
cp_to_remove . push_back ( c ) ;
2024-07-07 14:15:26 +02:00
return ;
}
2024-07-08 18:12:47 +02:00
//std::cout << "cpu :)\n";
2024-07-03 12:11:20 +02:00
// HACK: expensive, dont do every tick, only on events
// do verification in debug instead?
2024-07-08 18:12:47 +02:00
//cp.validateParticipation(c, _os.registry());
2024-07-03 12:11:20 +02:00
2024-07-07 12:44:17 +02:00
size_t peer_open_request = 0 ;
if ( _peer_open_requests . contains ( c ) ) {
peer_open_request + = _peer_open_requests . at ( c ) ;
}
2024-07-03 12:11:20 +02:00
auto new_requests = cp . updateChunkRequests (
c ,
_os . registry ( ) ,
2024-07-07 12:44:17 +02:00
_receiving_transfers ,
peer_open_request
2024-07-03 12:11:20 +02:00
) ;
if ( new_requests . empty ( ) ) {
2024-07-08 18:12:47 +02:00
// updateChunkRequests updates the unfinished
// TODO: pull out and check there?
if ( cp . participating_unfinished . empty ( ) ) {
2024-07-10 10:41:25 +02:00
std : : cout < < " destroying empty useless cp \n " ;
2024-07-08 18:12:47 +02:00
cp_to_remove . push_back ( c ) ;
} else {
2024-07-09 14:45:00 +02:00
// most likely will have something soon
// TODO: mark dirty on have instead?
c . get_or_emplace < ChunkPickerTimer > ( ) . timer = 10.f ;
2024-07-08 18:12:47 +02:00
}
2024-07-03 12:11:20 +02:00
return ;
}
assert ( c . all_of < Contact : : Components : : ToxGroupPeerEphemeral > ( ) ) ;
const auto [ group_number , peer_number ] = c . get < Contact : : Components : : ToxGroupPeerEphemeral > ( ) ;
for ( const auto [ r_o , r_idx ] : new_requests ) {
auto & cc = r_o . get < Components : : FT1ChunkSHA1Cache > ( ) ;
const auto & info = r_o . get < Components : : FT1InfoSHA1 > ( ) ;
// request chunk_idx
_nft . NGC_FT1_send_request_private (
group_number , peer_number ,
static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_CHUNK ) ,
info . chunks . at ( r_idx ) . data . data ( ) , info . chunks . at ( r_idx ) . size ( )
) ;
std : : cout < < " SHA1_NGCFT1: requesting chunk [ " < < info . chunks . at ( r_idx ) < < " ] from " < < group_number < < " : " < < peer_number < < " \n " ;
}
2024-07-08 18:12:47 +02:00
// force update every minute
// TODO: add small random bias to spread load
c . get_or_emplace < ChunkPickerTimer > ( ) . timer = 60.f ;
2024-07-03 12:11:20 +02:00
} ) ;
2024-07-07 14:15:26 +02:00
2024-07-08 18:12:47 +02:00
// unmark all marked
_cr . clear < ChunkPickerUpdateTag > ( ) ;
assert ( _cr . storage < ChunkPickerUpdateTag > ( ) . empty ( ) ) ;
2024-07-07 14:15:26 +02:00
for ( const auto & c : cp_to_remove ) {
2024-07-08 18:12:47 +02:00
c . remove < ChunkPicker , ChunkPickerTimer > ( ) ;
}
2023-08-19 22:37:55 +02:00
}
2024-07-07 13:21:59 +02:00
if ( _peer_open_requests . empty ( ) ) {
return 2.f ;
} else {
// pretty conservative and should be ajusted on a per peer, per delay basis
// seems to do the trick
return 0.05f ;
}
2023-08-19 22:37:55 +02:00
}
bool SHA1_NGCFT1 : : onEvent ( const Message : : Events : : MessageUpdated & e ) {
// see tox_transfer_manager.cpp for reference
if ( ! e . e . all_of < Message : : Components : : Transfer : : ActionAccept , Message : : Components : : Content > ( ) ) {
return false ;
}
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
auto ce = e . e . get < Message : : Components : : Content > ( ) ;
//if (!ce.all_of<Components::FT1InfoSHA1, Components::FT1ChunkSHA1Cache>()) {
if ( ! ce . all_of < Components : : FT1InfoSHA1 > ( ) ) {
// not ready to load yet, skip
return false ;
}
assert ( ! ce . all_of < Components : : FT1ChunkSHA1Cache > ( ) ) ;
assert ( ! ce . all_of < Message : : Components : : Transfer : : File > ( ) ) ;
// first, open file for write(+readback)
std : : string full_file_path { e . e . get < Message : : Components : : Transfer : : ActionAccept > ( ) . save_to_path } ;
// TODO: replace with filesystem or something
if ( full_file_path . back ( ) ! = ' / ' ) {
full_file_path + = " / " ;
}
2023-12-13 15:45:36 +01:00
// ensure dir exists
2023-08-19 22:37:55 +02:00
std : : filesystem : : create_directories ( full_file_path ) ;
const auto & info = ce . get < Components : : FT1InfoSHA1 > ( ) ;
full_file_path + = info . file_name ;
ce . emplace < Message : : Components : : Transfer : : FileInfoLocal > ( std : : vector { full_file_path } ) ;
const bool file_exists = std : : filesystem : : exists ( full_file_path ) ;
2024-05-27 11:20:37 +02:00
std : : unique_ptr < File2I > file_impl = std : : make_unique < File2RWMapped > ( full_file_path , info . file_size ) ;
2023-08-19 22:37:55 +02:00
if ( ! file_impl - > isGood ( ) ) {
std : : cerr < < " SHA1_NGCFT1 error: failed opening file ' " < < full_file_path < < " '! \n " ;
2023-12-13 15:45:36 +01:00
// we failed opening that filepath, so we should offer the user the oportunity to save it differently
e . e . remove < Message : : Components : : Transfer : : ActionAccept > ( ) ; // stop
2023-08-19 22:37:55 +02:00
return false ;
}
{ // next, create chuck cache and check for existing data
auto & cc = ce . emplace < Components : : FT1ChunkSHA1Cache > ( ) ;
2023-08-22 17:20:56 +02:00
auto & bytes_received = ce . get_or_emplace < Message : : Components : : Transfer : : BytesReceived > ( ) . total ;
2024-06-25 12:08:17 +02:00
cc . have_chunk = BitSet ( info . chunks . size ( ) ) ;
2023-08-19 22:37:55 +02:00
cc . have_all = false ;
cc . have_count = 0 ;
cc . chunk_hash_to_index . clear ( ) ; // if copy pasta
if ( file_exists ) {
// iterate existing file
for ( size_t i = 0 ; i < info . chunks . size ( ) ; i + + ) {
2023-08-22 17:20:56 +02:00
const uint64_t chunk_size = info . chunkSize ( i ) ;
2024-05-27 11:20:37 +02:00
auto existing_data = file_impl - > read ( chunk_size , i * uint64_t ( info . chunk_size ) ) ;
2023-08-22 17:20:56 +02:00
2024-05-27 11:20:37 +02:00
assert ( existing_data . size = = chunk_size ) ;
if ( existing_data . size = = chunk_size ) {
const auto data_hash = SHA1Digest { hash_sha1 ( existing_data . ptr , existing_data . size ) } ;
const bool data_equal = data_hash = = info . chunks . at ( i ) ;
2023-08-22 17:20:56 +02:00
2024-05-27 11:20:37 +02:00
if ( data_equal ) {
2024-06-25 12:08:17 +02:00
cc . have_chunk . set ( i ) ;
2024-05-27 11:20:37 +02:00
cc . have_count + = 1 ;
bytes_received + = chunk_size ;
//std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n";
} else {
//std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n";
}
2023-08-22 17:20:56 +02:00
} else {
2024-05-27 11:20:37 +02:00
// error reading?
2023-08-19 22:37:55 +02:00
}
_chunks [ info . chunks [ i ] ] = ce ;
cc . chunk_hash_to_index [ info . chunks [ i ] ] . push_back ( i ) ;
}
2023-08-22 17:20:56 +02:00
std : : cout < < " preexisting " < < cc . have_count < < " / " < < info . chunks . size ( ) < < " \n " ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
if ( cc . have_count > = info . chunks . size ( ) ) {
2023-08-19 22:37:55 +02:00
cc . have_all = true ;
2023-08-22 17:20:56 +02:00
//ce.remove<Message::Components::Transfer::BytesReceived>();
2023-08-19 22:37:55 +02:00
}
} else {
for ( size_t i = 0 ; i < info . chunks . size ( ) ; i + + ) {
_chunks [ info . chunks [ i ] ] = ce ;
cc . chunk_hash_to_index [ info . chunks [ i ] ] . push_back ( i ) ;
}
}
}
ce . emplace < Message : : Components : : Transfer : : File > ( std : : move ( file_impl ) ) ;
2024-06-23 10:17:48 +02:00
// announce we are participating
// since this is the first time, we publicly announce to all
if ( e . e . all_of < Message : : Components : : ContactFrom , Message : : Components : : ContactTo > ( ) ) {
const auto c_f = e . e . get < Message : : Components : : ContactFrom > ( ) . c ;
const auto c_t = e . e . get < Message : : Components : : ContactTo > ( ) . c ;
std : : vector < uint8_t > announce_id ;
const uint32_t file_kind = static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_INFO ) ;
for ( size_t i = 0 ; i < sizeof ( file_kind ) ; i + + ) {
announce_id . push_back ( ( file_kind > > ( i * 8 ) ) & 0xff ) ;
}
assert ( ce . all_of < Components : : FT1InfoSHA1Hash > ( ) ) ;
const auto & info_hash = ce . get < Components : : FT1InfoSHA1Hash > ( ) . hash ;
announce_id . insert ( announce_id . cend ( ) , info_hash . cbegin ( ) , info_hash . cend ( ) ) ;
if ( _cr . all_of < Contact : : Components : : ToxGroupEphemeral > ( c_t ) ) {
// public
const auto group_number = _cr . get < Contact : : Components : : ToxGroupEphemeral > ( c_t ) . group_number ;
_neep . send_all_pc1_announce ( group_number , announce_id . data ( ) , announce_id . size ( ) ) ;
} else if ( _cr . all_of < Contact : : Components : : ToxGroupPeerEphemeral > ( c_f ) ) {
// private ?
const auto [ group_number , peer_number ] = _cr . get < Contact : : Components : : ToxGroupPeerEphemeral > ( c_f ) ;
_neep . send_pc1_announce ( group_number , peer_number , announce_id . data ( ) , announce_id . size ( ) ) ;
}
}
2023-08-19 22:37:55 +02:00
ce . remove < Message : : Components : : Transfer : : TagPaused > ( ) ;
2024-07-08 18:12:47 +02:00
// start requesting from all participants
if ( ce . all_of < Components : : SuspectedParticipants > ( ) ) {
std : : cout < < " accepted ft has " < < ce . get < Components : : SuspectedParticipants > ( ) . participants . size ( ) < < " sp \n " ;
for ( const auto cv : ce . get < Components : : SuspectedParticipants > ( ) . participants ) {
_cr . emplace_or_replace < ChunkPickerUpdateTag > ( cv ) ;
}
} else {
std : : cout < < " accepted ft has NO sp! \n " ;
}
2023-08-19 22:37:55 +02:00
// should?
e . e . remove < Message : : Components : : Transfer : : ActionAccept > ( ) ;
updateMessages ( ce ) ;
return false ;
}
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCFT1_recv_request & e ) {
// only interested in sha1
if ( e . file_kind ! = NGCFT1_file_kind : : HASH_SHA1_INFO & & e . file_kind ! = NGCFT1_file_kind : : HASH_SHA1_CHUNK ) {
return false ;
}
//std::cout << "SHA1_NGCFT1: FT1_REQUEST fk:" << int(e.file_kind) << " [" << bin2hex({e.file_id, e.file_id+e.file_id_size}) << "]\n";
if ( e . file_kind = = NGCFT1_file_kind : : HASH_SHA1_INFO ) {
if ( e . file_id_size ! = 20 ) {
// error
return false ;
}
SHA1Digest info_hash { e . file_id , e . file_id_size } ;
if ( ! _info_to_content . count ( info_hash ) ) {
// we dont know about this
return false ;
}
auto content = _info_to_content . at ( info_hash ) ;
if ( ! content . all_of < Components : : FT1InfoSHA1Data > ( ) ) {
// we dont have the info for that infohash (yet?)
return false ;
}
// TODO: queue instead
//queueUpRequestInfo(e.group_number, e.peer_number, info_hash);
uint8_t transfer_id { 0 } ;
_nft . NGC_FT1_send_init_private (
e . group_number , e . peer_number ,
static_cast < uint32_t > ( e . file_kind ) ,
e . file_id , e . file_id_size ,
content . get < Components : : FT1InfoSHA1Data > ( ) . data . size ( ) ,
& transfer_id
) ;
_sending_transfers
2024-07-02 16:09:59 +02:00
[ combine_ids ( e . group_number , e . peer_number ) ]
2023-08-19 22:37:55 +02:00
[ transfer_id ]
. v = SendingTransfer : : Info { content . get < Components : : FT1InfoSHA1Data > ( ) . data } ;
2024-07-09 11:00:59 +02:00
const auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
_tox_peer_to_contact [ combine_ids ( e . group_number , e . peer_number ) ] = c ; // workaround
2023-08-19 22:37:55 +02:00
} else if ( e . file_kind = = NGCFT1_file_kind : : HASH_SHA1_CHUNK ) {
if ( e . file_id_size ! = 20 ) {
// error
return false ;
}
SHA1Digest chunk_hash { e . file_id , e . file_id_size } ;
if ( ! _chunks . count ( chunk_hash ) ) {
// we dont know about this
return false ;
}
2024-07-03 12:11:20 +02:00
auto o = _chunks . at ( chunk_hash ) ;
2023-08-19 22:37:55 +02:00
{ // they advertise interest in the content
const auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
2024-07-09 11:00:59 +02:00
_tox_peer_to_contact [ combine_ids ( e . group_number , e . peer_number ) ] = c ; // workaround
2024-07-08 18:12:47 +02:00
if ( addParticipation ( c , o ) ) {
// something happend, update chunk picker
assert ( static_cast < bool > ( c ) ) ;
c . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
}
2023-08-19 22:37:55 +02:00
}
2024-07-03 12:11:20 +02:00
assert ( o . all_of < Components : : FT1ChunkSHA1Cache > ( ) ) ;
2023-08-19 22:37:55 +02:00
2024-07-03 12:11:20 +02:00
if ( ! o . get < Components : : FT1ChunkSHA1Cache > ( ) . haveChunk ( chunk_hash ) ) {
2023-08-19 22:37:55 +02:00
// we dont have the chunk
return false ;
}
// queue good request
2024-07-03 12:11:20 +02:00
queueUpRequestChunk ( e . group_number , e . peer_number , o , chunk_hash ) ;
2023-08-19 22:37:55 +02:00
} else {
assert ( false & & " unhandled case " ) ;
}
return true ;
}
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCFT1_recv_init & e ) {
// only interested in sha1
if ( e . file_kind ! = NGCFT1_file_kind : : HASH_SHA1_INFO & & e . file_kind ! = NGCFT1_file_kind : : HASH_SHA1_CHUNK ) {
return false ;
}
// TODO: make sure we requested this?
if ( e . file_kind = = NGCFT1_file_kind : : HASH_SHA1_INFO ) {
SHA1Digest sha1_info_hash { e . file_id , e . file_id_size } ;
if ( ! _info_to_content . count ( sha1_info_hash ) ) {
// no idea about this content
return false ;
}
auto ce = _info_to_content . at ( sha1_info_hash ) ;
if ( ce . any_of < Components : : FT1InfoSHA1 , Components : : FT1InfoSHA1Data , Components : : FT1ChunkSHA1Cache > ( ) ) {
// we already have the info (should)
return false ;
}
// TODO: check if e.file_size too large / ask for permission
if ( e . file_size > 100 * 1024 * 1024 ) {
// a info size of 100MiB is ~640GiB for a 128KiB chunk size (default)
return false ;
}
2024-06-30 14:03:06 +02:00
_receiving_transfers . emplaceInfo (
2024-07-02 16:09:59 +02:00
e . group_number , e . peer_number ,
2024-06-30 14:03:06 +02:00
e . transfer_id ,
{ ce , std : : vector < uint8_t > ( e . file_size ) }
) ;
2023-08-19 22:37:55 +02:00
e . accept = true ;
2024-07-09 11:00:59 +02:00
const auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
_tox_peer_to_contact [ combine_ids ( e . group_number , e . peer_number ) ] = c ; // workaround
2023-08-19 22:37:55 +02:00
} else if ( e . file_kind = = NGCFT1_file_kind : : HASH_SHA1_CHUNK ) {
SHA1Digest sha1_chunk_hash { e . file_id , e . file_id_size } ;
if ( ! _chunks . count ( sha1_chunk_hash ) ) {
// no idea about this content
return false ;
}
2024-07-03 12:11:20 +02:00
auto o = _chunks . at ( sha1_chunk_hash ) ;
2023-08-19 22:37:55 +02:00
{ // they have the content (probably, might be fake, should move this to done)
const auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
2024-07-09 11:00:59 +02:00
_tox_peer_to_contact [ combine_ids ( e . group_number , e . peer_number ) ] = c ; // workaround
2024-07-08 18:12:47 +02:00
if ( addParticipation ( c , o ) ) {
// something happend, update chunk picker
assert ( static_cast < bool > ( c ) ) ;
c . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
}
2023-08-19 22:37:55 +02:00
}
2024-07-03 12:11:20 +02:00
assert ( o . all_of < Components : : FT1InfoSHA1 > ( ) ) ;
assert ( o . all_of < Components : : FT1ChunkSHA1Cache > ( ) ) ;
2023-08-19 22:37:55 +02:00
2024-07-03 12:11:20 +02:00
const auto & cc = o . get < Components : : FT1ChunkSHA1Cache > ( ) ;
2023-08-19 22:37:55 +02:00
if ( cc . haveChunk ( sha1_chunk_hash ) ) {
std : : cout < < " SHA1_NGCFT1: chunk rejected, already have [ " < < SHA1Digest { sha1_chunk_hash } < < " ] \n " ;
// we have the chunk
return false ;
}
// TODO: cache position
// calc offset_into_file
auto idx_vec = cc . chunkIndices ( sha1_chunk_hash ) ;
assert ( ! idx_vec . empty ( ) ) ;
2024-07-03 12:11:20 +02:00
// CHECK IF TRANSFER IN PROGESS!!
for ( const auto idx : idx_vec ) {
if ( _receiving_transfers . containsPeerChunk ( e . group_number , e . peer_number , o , idx ) ) {
std : : cerr < < " SHA1_NGCFT1 error: " < < e . group_number < < " : " < < e . peer_number < < " offered chunk( " < < idx < < " ) it is already receiving!! \n " ;
return false ;
}
}
const auto & info = o . get < Components : : FT1InfoSHA1 > ( ) ;
2023-08-19 22:37:55 +02:00
// TODO: check e.file_size
assert ( e . file_size = = info . chunkSize ( idx_vec . front ( ) ) ) ;
2024-06-30 14:03:06 +02:00
_receiving_transfers . emplaceChunk (
2024-07-02 16:09:59 +02:00
e . group_number , e . peer_number ,
2024-06-30 14:03:06 +02:00
e . transfer_id ,
2024-07-03 12:11:20 +02:00
ReceivingTransfers : : Entry : : Chunk { o , idx_vec }
2024-06-30 14:03:06 +02:00
) ;
2023-08-19 22:37:55 +02:00
e . accept = true ;
2024-07-07 12:44:17 +02:00
// now running, remove from requested
for ( const auto it : _receiving_transfers . getTransfer ( e . group_number , e . peer_number , e . transfer_id ) . getChunk ( ) . chunk_indices ) {
o . get_or_emplace < Components : : FT1ChunkSHA1Requested > ( ) . chunks . erase ( it ) ;
}
2023-08-19 22:37:55 +02:00
std : : cout < < " SHA1_NGCFT1: accepted chunk [ " < < SHA1Digest { sha1_chunk_hash } < < " ] \n " ;
} else {
assert ( false & & " unhandled case " ) ;
}
return true ;
}
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCFT1_recv_data & e ) {
2024-07-02 16:09:59 +02:00
if ( ! _receiving_transfers . containsPeerTransfer ( e . group_number , e . peer_number , e . transfer_id ) ) {
2023-08-19 22:37:55 +02:00
return false ;
}
2024-07-02 16:09:59 +02:00
auto & transfer = _receiving_transfers . getTransfer ( e . group_number , e . peer_number , e . transfer_id ) ;
2023-08-19 22:37:55 +02:00
2024-06-30 14:03:06 +02:00
transfer . time_since_activity = 0.f ;
if ( transfer . isInfo ( ) ) {
auto & info_data = transfer . getInfo ( ) . info_data ;
2023-08-19 22:37:55 +02:00
for ( size_t i = 0 ; i < e . data_size & & i + e . data_offset < info_data . size ( ) ; i + + ) {
info_data [ i + e . data_offset ] = e . data [ i ] ;
}
2024-06-30 14:03:06 +02:00
} else if ( transfer . isChunk ( ) ) {
auto o = transfer . getChunk ( ) . content ;
2023-08-19 22:37:55 +02:00
2024-06-30 14:03:06 +02:00
assert ( o . all_of < Message : : Components : : Transfer : : File > ( ) ) ;
auto * file = o . get < Message : : Components : : Transfer : : File > ( ) . get ( ) ;
2023-08-19 22:37:55 +02:00
assert ( file ! = nullptr ) ;
2024-06-30 14:03:06 +02:00
const auto chunk_size = o . get < Components : : FT1InfoSHA1 > ( ) . chunk_size ;
for ( const auto chunk_index : transfer . getChunk ( ) . chunk_indices ) {
const auto offset_into_file = chunk_index * chunk_size ;
2023-08-19 22:37:55 +02:00
2024-05-27 11:20:37 +02:00
if ( ! file - > write ( { e . data , e . data_size } , offset_into_file + e . data_offset ) ) {
2023-08-21 21:23:13 +02:00
std : : cerr < < " SHA1_NGCFT1 error: writing file failed o: " < < offset_into_file + e . data_offset < < " \n " ;
}
2023-08-19 22:37:55 +02:00
}
} else {
assert ( false & & " unhandled case " ) ;
}
return true ;
}
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCFT1_send_data & e ) {
2024-07-02 16:09:59 +02:00
if ( ! _sending_transfers . count ( combine_ids ( e . group_number , e . peer_number ) ) ) {
2023-08-19 22:37:55 +02:00
return false ;
}
2024-07-02 16:09:59 +02:00
auto & peer = _sending_transfers . at ( combine_ids ( e . group_number , e . peer_number ) ) ;
2023-08-19 22:37:55 +02:00
if ( ! peer . count ( e . transfer_id ) ) {
return false ;
}
auto & transfer = peer . at ( e . transfer_id ) ;
2023-08-24 18:04:25 +02:00
transfer . time_since_activity = 0.f ;
2023-08-19 22:37:55 +02:00
if ( std : : holds_alternative < SendingTransfer : : Info > ( transfer . v ) ) {
auto & info_transfer = std : : get < SendingTransfer : : Info > ( transfer . v ) ;
for ( size_t i = 0 ; i < e . data_size & & ( i + e . data_offset ) < info_transfer . info_data . size ( ) ; i + + ) {
e . data [ i ] = info_transfer . info_data [ i + e . data_offset ] ;
}
if ( e . data_offset + e . data_size > = info_transfer . info_data . size ( ) ) {
// was last read (probably TODO: add transfer destruction event)
peer . erase ( e . transfer_id ) ;
}
} else if ( std : : holds_alternative < SendingTransfer : : Chunk > ( transfer . v ) ) {
auto & chunk_transfer = std : : get < SendingTransfer : : Chunk > ( transfer . v ) ;
const auto & info = chunk_transfer . content . get < Components : : FT1InfoSHA1 > ( ) ;
// TODO: should we really use file?
2024-05-27 11:20:37 +02:00
const auto data = chunk_transfer . content . get < Message : : Components : : Transfer : : File > ( ) - > read (
e . data_size ,
( chunk_transfer . chunk_index * uint64_t ( info . chunk_size ) ) + e . data_offset
) ;
2023-08-19 22:37:55 +02:00
// TODO: optimize
2024-05-27 11:20:37 +02:00
for ( size_t i = 0 ; i < e . data_size & & i < data . size ; i + + ) {
2023-08-19 22:37:55 +02:00
e . data [ i ] = data [ i ] ;
}
2024-05-27 11:20:37 +02:00
chunk_transfer . content . get_or_emplace < Message : : Components : : Transfer : : BytesSent > ( ) . total + = data . size ;
2023-08-19 22:37:55 +02:00
// TODO: add event to propergate to messages
//_rmm.throwEventUpdate(transfer); // should we?
//if (e.data_offset + e.data_size >= *insert chunk size here*) {
//// was last read (probably TODO: add transfer destruction event)
//peer.erase(e.transfer_id);
//}
} else {
assert ( false & & " not implemented? " ) ;
}
return true ;
}
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCFT1_recv_done & e ) {
2024-07-02 16:09:59 +02:00
if ( ! _receiving_transfers . containsPeerTransfer ( e . group_number , e . peer_number , e . transfer_id ) ) {
2023-08-19 22:37:55 +02:00
return false ;
}
2024-07-02 16:09:59 +02:00
auto & transfer = _receiving_transfers . getTransfer ( e . group_number , e . peer_number , e . transfer_id ) ;
2023-08-19 22:37:55 +02:00
2024-06-30 14:03:06 +02:00
if ( transfer . isInfo ( ) ) {
auto & info = transfer . getInfo ( ) ;
auto o = info . content ;
2023-08-19 22:37:55 +02:00
2024-06-30 14:03:06 +02:00
if ( o . any_of < Components : : FT1InfoSHA1 , Components : : FT1InfoSHA1Data > ( ) ) {
2023-08-19 22:37:55 +02:00
// we already have the info, discard
2024-07-02 16:09:59 +02:00
_receiving_transfers . removePeerTransfer ( e . group_number , e . peer_number , e . transfer_id ) ;
2023-08-19 22:37:55 +02:00
return true ;
}
// check if data matches hash
auto hash = hash_sha1 ( info . info_data . data ( ) , info . info_data . size ( ) ) ;
2024-06-30 14:03:06 +02:00
assert ( o . all_of < Components : : FT1InfoSHA1Hash > ( ) ) ;
if ( o . get < Components : : FT1InfoSHA1Hash > ( ) . hash ! = hash ) {
2023-08-19 22:37:55 +02:00
std : : cerr < < " SHA1_NGCFT1 error: got info data mismatching its hash \n " ;
2024-06-30 14:03:06 +02:00
// TODO: requeue info request; eg manipulate o.get<Components::ReRequestInfoTimer>();
2024-07-02 16:09:59 +02:00
_receiving_transfers . removePeerTransfer ( e . group_number , e . peer_number , e . transfer_id ) ;
2023-08-19 22:37:55 +02:00
return true ;
}
2024-06-30 14:03:06 +02:00
const auto & info_data = o . emplace_or_replace < Components : : FT1InfoSHA1Data > ( std : : move ( info . info_data ) ) . data ;
auto & ft_info = o . emplace_or_replace < Components : : FT1InfoSHA1 > ( ) ;
2023-08-19 22:37:55 +02:00
ft_info . fromBuffer ( info_data ) ;
{ // file info
// TODO: not overwrite fi? since same?
2024-06-30 14:03:06 +02:00
auto & file_info = o . emplace_or_replace < Message : : Components : : Transfer : : FileInfo > ( ) ;
2023-08-19 22:37:55 +02:00
file_info . file_list . emplace_back ( ) = { ft_info . file_name , ft_info . file_size } ;
file_info . total_size = ft_info . file_size ;
}
std : : cout < < " SHA1_NGCFT1: got info for [ " < < SHA1Digest { hash } < < " ] \n " < < ft_info < < " \n " ;
2024-06-30 14:03:06 +02:00
o . remove < Components : : ReRequestInfoTimer > ( ) ;
if ( auto it = std : : find ( _queue_content_want_info . begin ( ) , _queue_content_want_info . end ( ) , o ) ; it ! = _queue_content_want_info . end ( ) ) {
2023-08-19 22:37:55 +02:00
_queue_content_want_info . erase ( it ) ;
}
2024-06-30 14:03:06 +02:00
o . emplace_or_replace < Message : : Components : : Transfer : : TagPaused > ( ) ;
2023-08-19 22:37:55 +02:00
2024-06-30 14:03:06 +02:00
updateMessages ( o ) ;
} else if ( transfer . isChunk ( ) ) {
auto o = transfer . getChunk ( ) . content ;
const auto & info = o . get < Components : : FT1InfoSHA1 > ( ) ;
auto & cc = o . get < Components : : FT1ChunkSHA1Cache > ( ) ;
2023-08-19 22:37:55 +02:00
// HACK: only check first chunk (they *should* all be the same)
2024-06-30 14:03:06 +02:00
const auto chunk_index = transfer . getChunk ( ) . chunk_indices . front ( ) ;
2023-08-22 17:20:56 +02:00
const uint64_t offset_into_file = chunk_index * uint64_t ( info . chunk_size ) ;
2023-08-19 22:37:55 +02:00
assert ( chunk_index < info . chunks . size ( ) ) ;
const auto chunk_size = info . chunkSize ( chunk_index ) ;
assert ( offset_into_file + chunk_size < = info . file_size ) ;
2024-06-30 14:03:06 +02:00
const auto chunk_data = o . get < Message : : Components : : Transfer : : File > ( ) - > read ( chunk_size , offset_into_file ) ;
2024-05-27 11:20:37 +02:00
assert ( ! chunk_data . empty ( ) ) ;
2023-08-19 22:37:55 +02:00
// check hash of chunk
2024-05-27 11:20:37 +02:00
auto got_hash = hash_sha1 ( chunk_data . ptr , chunk_data . size ) ;
2023-08-19 22:37:55 +02:00
if ( info . chunks . at ( chunk_index ) = = got_hash ) {
std : : cout < < " SHA1_NGCFT1: got chunk [ " < < SHA1Digest { got_hash } < < " ] \n " ;
if ( ! cc . have_all ) {
2024-06-30 14:03:06 +02:00
for ( const auto inner_chunk_index : transfer . getChunk ( ) . chunk_indices ) {
2024-06-25 12:08:17 +02:00
if ( ! cc . have_all & & ! cc . have_chunk [ inner_chunk_index ] ) {
cc . have_chunk . set ( inner_chunk_index ) ;
2023-08-19 22:37:55 +02:00
cc . have_count + = 1 ;
if ( cc . have_count = = info . chunks . size ( ) ) {
// debug check
2024-06-25 12:08:17 +02:00
for ( [[maybe_unused]] size_t i = 0 ; i < info . chunks . size ( ) ; i + + ) {
assert ( cc . have_chunk [ i ] ) ;
2023-08-19 22:37:55 +02:00
}
cc . have_all = true ;
2024-06-25 12:08:17 +02:00
cc . have_chunk = BitSet ( 0 ) ; // not wasting memory
2023-08-19 22:37:55 +02:00
std : : cout < < " SHA1_NGCFT1: got all chunks for \n " < < info < < " \n " ;
2023-08-22 21:13:09 +02:00
// HACK: remap file, to clear ram
// TODO: error checking
2024-06-30 14:03:06 +02:00
o . get < Message : : Components : : Transfer : : File > ( ) = std : : make_unique < File2RWMapped > (
o . get < Message : : Components : : Transfer : : FileInfoLocal > ( ) . file_list . front ( ) ,
2023-08-22 21:13:09 +02:00
info . file_size
) ;
2023-08-19 22:37:55 +02:00
}
// good chunk
// TODO: have wasted + metadata
2024-06-30 14:03:06 +02:00
o . get_or_emplace < Message : : Components : : Transfer : : BytesReceived > ( ) . total + = chunk_data . size ;
2023-08-19 22:37:55 +02:00
}
}
2024-06-23 15:12:31 +02:00
// queue chunk have for all participants
// HACK: send immediatly to all participants
2024-06-30 14:03:06 +02:00
for ( const auto c_part : o . get < Components : : SuspectedParticipants > ( ) . participants ) {
2024-06-23 15:12:31 +02:00
if ( ! _cr . all_of < Contact : : Components : : ToxGroupPeerEphemeral > ( c_part ) ) {
continue ;
}
const auto [ part_group_number , part_peer_number ] = _cr . get < Contact : : Components : : ToxGroupPeerEphemeral > ( c_part ) ;
2024-06-30 14:03:06 +02:00
const auto & info_hash = o . get < Components : : FT1InfoSHA1Hash > ( ) . hash ;
2024-06-23 15:12:31 +02:00
// convert size_t to uint32_t
const std : : vector < uint32_t > chunk_indices {
2024-06-30 14:03:06 +02:00
transfer . getChunk ( ) . chunk_indices . cbegin ( ) ,
transfer . getChunk ( ) . chunk_indices . cend ( )
2024-06-23 15:12:31 +02:00
} ;
_neep . send_ft1_have (
part_group_number , part_peer_number ,
static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_INFO ) ,
info_hash . data ( ) , info_hash . size ( ) ,
chunk_indices . data ( ) , chunk_indices . size ( )
) ;
}
2024-06-25 21:09:46 +02:00
if ( ! cc . have_all ) { // debug print self have set
std : : cout < < " DEBUG print have bitset: s: " < < cc . have_chunk . size_bits ( ) ;
for ( size_t i = 0 ; i < cc . have_chunk . size_bytes ( ) ; i + + ) {
2024-07-07 13:07:57 +02:00
if ( i % 32 = = 0 ) {
printf ( " \n " ) ;
2024-06-25 21:09:46 +02:00
}
2024-07-07 13:07:57 +02:00
// f cout
printf ( " %.2x " , ( uint16_t ) cc . have_chunk . data ( ) [ i ] ) ;
2024-06-25 21:09:46 +02:00
}
2024-07-07 13:07:57 +02:00
printf ( " \n " ) ;
2024-06-25 21:09:46 +02:00
}
2023-08-19 22:37:55 +02:00
} else {
std : : cout < < " SHA1_NGCFT1 warning: got chunk duplicate \n " ;
}
2024-07-08 18:12:47 +02:00
// something happend, update chunk picker
auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
assert ( static_cast < bool > ( c ) ) ;
c . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
2023-08-19 22:37:55 +02:00
} else {
// bad chunk
2023-08-21 21:23:13 +02:00
std : : cout < < " SHA1_NGCFT1: got BAD chunk from " < < e . group_number < < " : " < < e . peer_number < < " [ " < < info . chunks . at ( chunk_index ) < < " ] ; instead got [ " < < SHA1Digest { got_hash } < < " ] \n " ;
}
// remove from requested
// TODO: remove at init and track running transfers differently
2024-07-08 18:12:47 +02:00
// should be done, double check later
2024-06-30 14:03:06 +02:00
for ( const auto it : transfer . getChunk ( ) . chunk_indices ) {
o . get_or_emplace < Components : : FT1ChunkSHA1Requested > ( ) . chunks . erase ( it ) ;
2023-08-19 22:37:55 +02:00
}
2024-06-30 14:03:06 +02:00
updateMessages ( o ) ; // mostly for received bytes
2023-08-19 22:37:55 +02:00
}
2024-07-02 16:09:59 +02:00
_receiving_transfers . removePeerTransfer ( e . group_number , e . peer_number , e . transfer_id ) ;
2023-08-19 22:37:55 +02:00
return true ;
}
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCFT1_send_done & e ) {
2024-07-02 16:09:59 +02:00
if ( ! _sending_transfers . count ( combine_ids ( e . group_number , e . peer_number ) ) ) {
2023-08-19 22:37:55 +02:00
return false ;
}
2024-07-02 16:09:59 +02:00
auto & peer_transfers = _sending_transfers . at ( combine_ids ( e . group_number , e . peer_number ) ) ;
2023-08-19 22:37:55 +02:00
if ( ! peer_transfers . count ( e . transfer_id ) ) {
return false ;
}
const auto & tv = peer_transfers [ e . transfer_id ] . v ;
if ( std : : holds_alternative < SendingTransfer : : Chunk > ( tv ) ) {
updateMessages ( std : : get < SendingTransfer : : Chunk > ( tv ) . content ) ; // mostly for sent bytes
}
peer_transfers . erase ( e . transfer_id ) ;
return true ;
}
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCFT1_recv_message & e ) {
if ( e . file_kind ! = NGCFT1_file_kind : : HASH_SHA1_INFO ) {
return false ;
}
uint64_t ts = std : : chrono : : duration_cast < std : : chrono : : milliseconds > ( std : : chrono : : system_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ;
const auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
2024-07-09 11:00:59 +02:00
_tox_peer_to_contact [ combine_ids ( e . group_number , e . peer_number ) ] = c ; // workaround
2023-08-19 22:37:55 +02:00
const auto self_c = c . get < Contact : : Components : : Self > ( ) . self ;
auto * reg_ptr = _rmm . get ( c ) ;
if ( reg_ptr = = nullptr ) {
std : : cerr < < " SHA1_NGCFT1 error: cant find reg \n " ;
return false ;
}
Message3Registry & reg = * reg_ptr ;
// TODO: check for existence, hs or other syncing mechanics might have sent it already (or like, it arrived 2x or whatever)
2024-06-23 10:17:48 +02:00
// TODO: use the message dup test provided via rmm
2023-08-19 22:37:55 +02:00
auto new_msg_e = reg . create ( ) ;
{ // contact
// from
reg . emplace < Message : : Components : : ContactFrom > ( new_msg_e , c ) ;
// to
reg . emplace < Message : : Components : : ContactTo > ( new_msg_e , c . get < Contact : : Components : : Parent > ( ) . parent ) ;
}
reg . emplace < Message : : Components : : ToxGroupMessageID > ( new_msg_e , e . message_id ) ;
reg . emplace < Message : : Components : : Transfer : : TagReceiving > ( new_msg_e ) ; // add sending?
reg . emplace < Message : : Components : : TimestampProcessed > ( new_msg_e , ts ) ;
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
reg . emplace < Message : : Components : : Timestamp > ( new_msg_e , ts ) ; // reactive?
2023-09-30 00:27:01 +02:00
reg . emplace < Message : : Components : : TagUnread > ( new_msg_e ) ;
2023-08-19 22:37:55 +02:00
{ // by whom
2024-01-12 18:55:41 +01:00
reg . get_or_emplace < Message : : Components : : SyncedBy > ( new_msg_e ) . ts . try_emplace ( self_c , ts ) ;
}
{ // we received it, so we have it
2024-04-20 15:12:05 +02:00
auto & rb = reg . get_or_emplace < Message : : Components : : ReceivedBy > ( new_msg_e ) . ts ;
rb . try_emplace ( c , ts ) ;
// TODO: how do we handle partial files???
// tox ft rn only sets self if the file was received fully
rb . try_emplace ( self_c , ts ) ;
2023-08-19 22:37:55 +02:00
}
// check if content exists
const auto sha1_info_hash = std : : vector < uint8_t > { e . file_id , e . file_id + e . file_id_size } ;
2024-04-29 11:55:11 +02:00
ObjectHandle ce ;
2023-08-19 22:37:55 +02:00
if ( _info_to_content . count ( sha1_info_hash ) ) {
ce = _info_to_content . at ( sha1_info_hash ) ;
std : : cout < < " SHA1_NGCFT1: new message has existing content \n " ;
} else {
2024-04-29 11:55:11 +02:00
// TODO: backend
ce = { _os . registry ( ) , _os . registry ( ) . create ( ) } ;
2023-08-19 22:37:55 +02:00
_info_to_content [ sha1_info_hash ] = ce ;
std : : cout < < " SHA1_NGCFT1: new message has new content \n " ;
//ce.emplace<Components::FT1InfoSHA1>(sha1_info);
//ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
ce . emplace < Components : : FT1InfoSHA1Hash > ( sha1_info_hash ) ;
}
ce . get_or_emplace < Components : : Messages > ( ) . messages . push_back ( { reg , new_msg_e } ) ;
reg_ptr - > emplace < Message : : Components : : Content > ( new_msg_e , ce ) ;
2024-06-28 15:13:17 +02:00
// HACK: assume the message sender is participating. usually a safe bet.
2024-07-08 18:12:47 +02:00
if ( addParticipation ( c , ce ) ) {
// something happend, update chunk picker
assert ( static_cast < bool > ( c ) ) ;
c . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
}
2023-08-19 22:37:55 +02:00
2024-07-03 12:11:20 +02:00
// HACK: assume the message sender has all
ce . get_or_emplace < Components : : RemoteHave > ( ) . others [ c ] = { true , { } } ;
2023-08-19 22:37:55 +02:00
if ( ! ce . all_of < Components : : ReRequestInfoTimer > ( ) & & ! ce . all_of < Components : : FT1InfoSHA1 > ( ) ) {
// TODO: check if already receiving
_queue_content_want_info . push_back ( ce ) ;
}
// TODO: queue info dl
//reg_ptr->emplace<Components::FT1InfoSHA1>(e, sha1_info);
//reg_ptr->emplace<Components::FT1InfoSHA1Data>(e, sha1_info_data); // keep around? or file?
//reg.emplace<Components::FT1InfoSHA1Hash>(new_msg_e, std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size});
if ( auto * cc = ce . try_get < Components : : FT1ChunkSHA1Cache > ( ) ; cc ! = nullptr & & cc - > have_all ) {
reg_ptr - > emplace < Message : : Components : : Transfer : : TagHaveAll > ( new_msg_e ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : FileInfo > ( ) ) {
reg_ptr - > emplace < Message : : Components : : Transfer : : FileInfo > ( new_msg_e , ce . get < Message : : Components : : Transfer : : FileInfo > ( ) ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : FileInfoLocal > ( ) ) {
reg_ptr - > emplace < Message : : Components : : Transfer : : FileInfoLocal > ( new_msg_e , ce . get < Message : : Components : : Transfer : : FileInfoLocal > ( ) ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : BytesSent > ( ) ) {
reg_ptr - > emplace < Message : : Components : : Transfer : : BytesSent > ( new_msg_e , ce . get < Message : : Components : : Transfer : : BytesSent > ( ) ) ;
}
// TODO: queue info/check if we already have info
_rmm . throwEventConstruct ( reg , new_msg_e ) ;
return true ; // false?
}
bool SHA1_NGCFT1 : : sendFilePath ( const Contact3 c , std : : string_view file_name , std : : string_view file_path ) {
if (
// TODO: add support of offline queuing
! _cr . all_of < Contact : : Components : : ToxGroupEphemeral > ( c )
) {
return false ;
}
std : : cout < < " SHA1_NGCFT1: got sendFilePath() \n " ;
auto * reg_ptr = _rmm . get ( c ) ;
if ( reg_ptr = = nullptr ) {
return false ;
}
// get current time unix epoch utc
uint64_t ts = std : : chrono : : duration_cast < std : : chrono : : milliseconds > ( std : : chrono : : system_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ;
2023-08-21 00:01:14 +02:00
std : : thread ( std : : move ( [
// copy everything
self = this ,
ts ,
c ,
reg_ptr ,
file_name_ = std : : string ( file_name ) ,
file_path_ = std : : string ( file_path )
] ( ) mutable {
2024-05-27 11:20:37 +02:00
auto file_impl = std : : make_unique < File2RWMapped > ( file_path_ , - 1 ) ;
2023-08-21 00:01:14 +02:00
if ( ! file_impl - > isGood ( ) ) {
{
std : : lock_guard l { self - > _info_builder_queue_mutex } ;
self - > _info_builder_queue . push_back ( [ file_path_ ] ( ) {
// back on iterate thread
std : : cerr < < " SHA1_NGCFT1 error: failed opening file ' " < < file_path_ < < " '! \n " ;
} ) ;
self - > _info_builder_dirty = true ; // still in scope, set before mutex unlock
}
return ;
2023-08-19 22:37:55 +02:00
}
2023-08-21 00:01:14 +02:00
// 1. build info by hashing all chunks
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
FT1InfoSHA1 sha1_info ;
// build info
sha1_info . file_name = file_name_ ;
2024-05-27 11:20:37 +02:00
sha1_info . file_size = file_impl - > _file_size ; // TODO: remove the reliance on implementation details
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
{ // build chunks
// HACK: load file fully
2024-05-27 11:20:37 +02:00
// ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway
const auto file_data = file_impl - > read ( file_impl - > _file_size , 0 ) ;
2023-08-21 00:01:14 +02:00
size_t i = 0 ;
2024-05-27 11:20:37 +02:00
for ( ; i + sha1_info . chunk_size < file_data . size ; i + = sha1_info . chunk_size ) {
sha1_info . chunks . push_back ( hash_sha1 ( file_data . ptr + i , sha1_info . chunk_size ) ) ;
2023-08-21 00:01:14 +02:00
}
2023-08-19 22:37:55 +02:00
2024-05-27 11:20:37 +02:00
if ( i < file_data . size ) {
sha1_info . chunks . push_back ( hash_sha1 ( file_data . ptr + i , file_data . size - i ) ) ;
2023-08-19 22:37:55 +02:00
}
}
2023-08-21 00:01:14 +02:00
file_impl . reset ( ) ;
{
std : : lock_guard l { self - > _info_builder_queue_mutex } ;
self - > _info_builder_queue . push_back ( std : : move ( [
self ,
ts ,
c ,
reg_ptr ,
file_name_ ,
file_path_ ,
sha1_info = std : : move ( sha1_info )
] ( ) mutable { //
// back on iterate thread
2024-05-27 11:20:37 +02:00
auto file_impl = std : : make_unique < File2RWMapped > ( file_path_ , sha1_info . file_size ) ;
2023-08-21 00:01:14 +02:00
if ( ! file_impl - > isGood ( ) ) {
std : : cerr < < " SHA1_NGCFT1 error: failed opening file ' " < < file_path_ < < " '! \n " ;
return ;
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// 2. hash info
std : : vector < uint8_t > sha1_info_data ;
std : : vector < uint8_t > sha1_info_hash ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
std : : cout < < " SHA1_NGCFT1 info is: \n " < < sha1_info ;
sha1_info_data = sha1_info . toBuffer ( ) ;
std : : cout < < " SHA1_NGCFT1 sha1_info size: " < < sha1_info_data . size ( ) < < " \n " ;
sha1_info_hash = hash_sha1 ( sha1_info_data . data ( ) , sha1_info_data . size ( ) ) ;
std : : cout < < " SHA1_NGCFT1 sha1_info_hash: " < < bin2hex ( sha1_info_hash ) < < " \n " ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// check if content exists
2024-04-29 11:55:11 +02:00
ObjectHandle ce ;
2023-08-21 00:01:14 +02:00
if ( self - > _info_to_content . count ( sha1_info_hash ) ) {
ce = self - > _info_to_content . at ( sha1_info_hash ) ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// TODO: check if content is incomplete and use file instead
if ( ! ce . all_of < Components : : FT1InfoSHA1 > ( ) ) {
ce . emplace < Components : : FT1InfoSHA1 > ( sha1_info ) ;
}
if ( ! ce . all_of < Components : : FT1InfoSHA1Data > ( ) ) {
ce . emplace < Components : : FT1InfoSHA1Data > ( sha1_info_data ) ;
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// hash has to be set already
// Components::FT1InfoSHA1Hash
{ // lookup tables and have
auto & cc = ce . get_or_emplace < Components : : FT1ChunkSHA1Cache > ( ) ;
cc . have_all = true ;
// skip have vec, since all
//cc.have_chunk
cc . have_count = sha1_info . chunks . size ( ) ; // need?
self - > _info_to_content [ sha1_info_hash ] = ce ;
cc . chunk_hash_to_index . clear ( ) ; // for cpy pst
for ( size_t i = 0 ; i < sha1_info . chunks . size ( ) ; i + + ) {
self - > _chunks [ sha1_info . chunks [ i ] ] = ce ;
cc . chunk_hash_to_index [ sha1_info . chunks [ i ] ] . push_back ( i ) ;
}
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
{ // file info
// TODO: not overwrite fi? since same?
auto & file_info = ce . emplace_or_replace < Message : : Components : : Transfer : : FileInfo > ( ) ;
file_info . file_list . emplace_back ( ) = { std : : string { file_name_ } , file_impl - > _file_size } ;
file_info . total_size = file_impl - > _file_size ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
ce . emplace_or_replace < Message : : Components : : Transfer : : FileInfoLocal > ( std : : vector { std : : string { file_path_ } } ) ;
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// cleanup file
if ( ce . all_of < Message : : Components : : Transfer : : File > ( ) ) {
// replace
ce . remove < Message : : Components : : Transfer : : File > ( ) ;
}
ce . emplace < Message : : Components : : Transfer : : File > ( std : : move ( file_impl ) ) ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
if ( ! ce . all_of < Message : : Components : : Transfer : : BytesSent > ( ) ) {
ce . emplace < Message : : Components : : Transfer : : BytesSent > ( 0u ) ;
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
ce . remove < Message : : Components : : Transfer : : TagPaused > ( ) ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// we dont want the info anymore
ce . remove < Components : : ReRequestInfoTimer > ( ) ;
if ( auto it = std : : find ( self - > _queue_content_want_info . begin ( ) , self - > _queue_content_want_info . end ( ) , ce ) ; it ! = self - > _queue_content_want_info . end ( ) ) {
self - > _queue_content_want_info . erase ( it ) ;
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// TODO: we dont want chunks anymore
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
} else {
2024-04-29 11:55:11 +02:00
// TODO: backend
ce = { self - > _os . registry ( ) , self - > _os . registry ( ) . create ( ) } ;
2023-08-21 00:01:14 +02:00
self - > _info_to_content [ sha1_info_hash ] = ce ;
ce . emplace < Components : : FT1InfoSHA1 > ( sha1_info ) ;
ce . emplace < Components : : FT1InfoSHA1Data > ( sha1_info_data ) ; // keep around? or file?
ce . emplace < Components : : FT1InfoSHA1Hash > ( sha1_info_hash ) ;
{ // lookup tables and have
auto & cc = ce . emplace < Components : : FT1ChunkSHA1Cache > ( ) ;
cc . have_all = true ;
// skip have vec, since all
//cc.have_chunk
cc . have_count = sha1_info . chunks . size ( ) ; // need?
self - > _info_to_content [ sha1_info_hash ] = ce ;
cc . chunk_hash_to_index . clear ( ) ; // for cpy pst
for ( size_t i = 0 ; i < sha1_info . chunks . size ( ) ; i + + ) {
self - > _chunks [ sha1_info . chunks [ i ] ] = ce ;
cc . chunk_hash_to_index [ sha1_info . chunks [ i ] ] . push_back ( i ) ;
}
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
{ // file info
auto & file_info = ce . emplace < Message : : Components : : Transfer : : FileInfo > ( ) ;
//const auto& file = ce.get<Message::Components::Transfer::File>();
file_info . file_list . emplace_back ( ) = { std : : string { file_name_ } , file_impl - > _file_size } ;
file_info . total_size = file_impl - > _file_size ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
ce . emplace < Message : : Components : : Transfer : : FileInfoLocal > ( std : : vector { std : : string { file_path_ } } ) ;
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
ce . emplace < Message : : Components : : Transfer : : File > ( std : : move ( file_impl ) ) ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
ce . emplace < Message : : Components : : Transfer : : BytesSent > ( 0u ) ;
}
2023-08-19 22:37:55 +02:00
2024-07-08 18:12:47 +02:00
// something happend, update all chunk pickers
if ( ce . all_of < Components : : SuspectedParticipants > ( ) ) {
for ( const auto & pcv : ce . get < Components : : SuspectedParticipants > ( ) . participants ) {
Contact3Handle pch { self - > _cr , pcv } ;
assert ( static_cast < bool > ( pch ) ) ;
pch . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
}
}
2023-08-21 00:01:14 +02:00
const auto c_self = self - > _cr . get < Contact : : Components : : Self > ( c ) . self ;
if ( ! self - > _cr . valid ( c_self ) ) {
std : : cerr < < " SHA1_NGCFT1 error: failed to get self! \n " ;
return ;
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
const auto msg_e = reg_ptr - > create ( ) ;
reg_ptr - > emplace < Message : : Components : : ContactTo > ( msg_e , c ) ;
reg_ptr - > emplace < Message : : Components : : ContactFrom > ( msg_e , c_self ) ;
reg_ptr - > emplace < Message : : Components : : Timestamp > ( msg_e , ts ) ; // reactive?
2023-09-30 00:27:01 +02:00
reg_ptr - > emplace < Message : : Components : : Read > ( msg_e , ts ) ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
reg_ptr - > emplace < Message : : Components : : Transfer : : TagHaveAll > ( msg_e ) ;
reg_ptr - > emplace < Message : : Components : : Transfer : : TagSending > ( msg_e ) ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
ce . get_or_emplace < Components : : Messages > ( ) . messages . push_back ( { * reg_ptr , msg_e } ) ;
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
//reg_ptr->emplace<Message::Components::Transfer::FileKind>(e, file_kind);
// file id would be sha1_info hash or something
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// remove? done in updateMessages() anyway
if ( ce . all_of < Message : : Components : : Transfer : : FileInfo > ( ) ) {
reg_ptr - > emplace < Message : : Components : : Transfer : : FileInfo > ( msg_e , ce . get < Message : : Components : : Transfer : : FileInfo > ( ) ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : FileInfoLocal > ( ) ) {
reg_ptr - > emplace < Message : : Components : : Transfer : : FileInfoLocal > ( msg_e , ce . get < Message : : Components : : Transfer : : FileInfoLocal > ( ) ) ;
}
if ( ce . all_of < Message : : Components : : Transfer : : BytesSent > ( ) ) {
reg_ptr - > emplace < Message : : Components : : Transfer : : BytesSent > ( msg_e , ce . get < Message : : Components : : Transfer : : BytesSent > ( ) ) ;
}
2023-08-19 22:37:55 +02:00
2023-08-21 00:01:14 +02:00
// TODO: determine if this is true
//reg_ptr->emplace<Message::Components::Transfer::TagPaused>(e);
if ( self - > _cr . any_of < Contact : : Components : : ToxGroupEphemeral > ( c ) ) {
const uint32_t group_number = self - > _cr . get < Contact : : Components : : ToxGroupEphemeral > ( c ) . group_number ;
uint32_t message_id = 0 ;
// TODO: check return
self - > _nft . NGC_FT1_send_message_public ( group_number , message_id , static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_INFO ) , sha1_info_hash . data ( ) , sha1_info_hash . size ( ) ) ;
reg_ptr - > emplace < Message : : Components : : ToxGroupMessageID > ( msg_e , message_id ) ;
} else if (
// non online group
self - > _cr . any_of < Contact : : Components : : ToxGroupPersistent > ( c )
) {
// create msg_id
const uint32_t message_id = randombytes_random ( ) ;
reg_ptr - > emplace < Message : : Components : : ToxGroupMessageID > ( msg_e , message_id ) ;
}
2023-08-19 22:37:55 +02:00
2024-04-20 15:12:05 +02:00
reg_ptr - > get_or_emplace < Message : : Components : : SyncedBy > ( msg_e ) . ts . try_emplace ( c_self , ts ) ;
reg_ptr - > get_or_emplace < Message : : Components : : ReceivedBy > ( msg_e ) . ts . try_emplace ( c_self , ts ) ;
2024-01-12 18:55:41 +01:00
2023-08-21 00:01:14 +02:00
self - > _rmm . throwEventConstruct ( * reg_ptr , msg_e ) ;
// TODO: place in iterate?
self - > updateMessages ( ce ) ;
} ) ) ;
self - > _info_builder_dirty = true ; // still in scope, set before mutex unlock
}
} ) ) . detach ( ) ;
2023-08-19 22:37:55 +02:00
return true ;
}
2024-06-22 14:08:12 +02:00
bool SHA1_NGCFT1 : : onToxEvent ( const Tox_Event_Group_Peer_Exit * e ) {
const auto group_number = tox_event_group_peer_exit_get_group_number ( e ) ;
const auto peer_number = tox_event_group_peer_exit_get_peer_id ( e ) ;
// peer disconnected
// - remove from all participantions
2024-07-07 14:15:26 +02:00
{
// FIXME: this does not work, tcm just delteded the relation ship
2024-07-09 11:00:59 +02:00
//auto c = _tcm.getContactGroupPeer(group_number, peer_number);
const auto c_it = _tox_peer_to_contact . find ( combine_ids ( group_number , peer_number ) ) ;
if ( c_it = = _tox_peer_to_contact . end ( ) ) {
return false ;
}
auto c = c_it - > second ;
2024-07-07 14:15:26 +02:00
if ( ! static_cast < bool > ( c ) ) {
return false ;
}
2024-06-22 14:08:12 +02:00
2024-07-08 18:12:47 +02:00
c . remove < ChunkPicker , ChunkPickerUpdateTag , ChunkPickerTimer > ( ) ;
2024-07-03 12:11:20 +02:00
2024-07-07 14:15:26 +02:00
for ( const auto & [ _ , o ] : _info_to_content ) {
removeParticipation ( c , o ) ;
2024-06-22 14:08:12 +02:00
2024-07-07 14:15:26 +02:00
if ( o . all_of < Components : : RemoteHave > ( ) ) {
o . get < Components : : RemoteHave > ( ) . others . erase ( c ) ;
}
2024-06-25 21:09:46 +02:00
}
2024-06-22 14:08:12 +02:00
}
// - clear queues
for ( auto it = _queue_requested_chunk . begin ( ) ; it ! = _queue_requested_chunk . end ( ) ; ) {
if ( group_number = = std : : get < 0 > ( * it ) & & peer_number = = std : : get < 1 > ( * it ) ) {
it = _queue_requested_chunk . erase ( it ) ;
} else {
it + + ;
}
}
2024-06-30 14:03:06 +02:00
// TODO: nfcft1 should have fired receive/send done events for all them running transfers
2024-06-22 14:08:12 +02:00
return false ;
}
2024-06-23 15:12:31 +02:00
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCEXT_ft1_have & e ) {
std : : cerr < < " SHA1_NGCFT1: FT1_HAVE s: " < < e . chunks . size ( ) < < " \n " ;
2024-06-25 21:09:46 +02:00
if ( e . file_kind ! = static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_INFO ) ) {
return false ;
}
SHA1Digest info_hash { e . file_id } ;
auto itc_it = _info_to_content . find ( info_hash ) ;
if ( itc_it = = _info_to_content . end ( ) ) {
// we are not interested and dont track this
return false ;
}
2024-06-28 21:10:01 +02:00
auto o = itc_it - > second ;
2024-06-25 21:09:46 +02:00
2024-06-28 21:10:01 +02:00
if ( ! static_cast < bool > ( o ) ) {
2024-06-25 21:09:46 +02:00
std : : cerr < < " SHA1_NGCFT1 error: tracking info has null object \n " ;
return false ;
}
2024-06-28 21:10:01 +02:00
const size_t num_total_chunks = o . get < Components : : FT1InfoSHA1 > ( ) . chunks . size ( ) ;
2024-06-25 21:09:46 +02:00
const auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
2024-07-09 14:45:00 +02:00
assert ( static_cast < bool > ( c ) ) ;
2024-07-09 11:00:59 +02:00
_tox_peer_to_contact [ combine_ids ( e . group_number , e . peer_number ) ] = c ; // workaround
2024-06-25 21:09:46 +02:00
2024-06-28 15:13:17 +02:00
// we might not know yet
2024-07-08 18:12:47 +02:00
if ( addParticipation ( c , o ) ) {
// something happend, update chunk picker
c . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
}
2024-06-28 15:13:17 +02:00
2024-06-28 21:10:01 +02:00
auto & remote_have = o . get_or_emplace < Components : : RemoteHave > ( ) . others ;
2024-06-25 21:09:46 +02:00
if ( ! remote_have . contains ( c ) ) {
// init
remote_have . emplace ( c , Components : : RemoteHave : : Entry { false , num_total_chunks } ) ;
2024-07-09 14:45:00 +02:00
// new have? nice
// (always update on biset, not always on have)
c . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
2024-06-25 21:09:46 +02:00
}
auto & remote_have_peer = remote_have . at ( c ) ;
if ( ! remote_have_peer . have_all ) {
assert ( remote_have_peer . have . size_bits ( ) > = num_total_chunks ) ;
for ( const auto c_i : e . chunks ) {
if ( c_i > = num_total_chunks ) {
std : : cerr < < " SHA1_NGCFT1 error: remote sent have with out-of-range chunk index!!! \n " ;
2024-07-03 12:11:20 +02:00
std : : cerr < < info_hash < < " : " < < c_i < < " >= " < < num_total_chunks < < " \n " ;
2024-06-25 21:09:46 +02:00
continue ;
}
assert ( c_i < num_total_chunks ) ;
remote_have_peer . have . set ( c_i ) ;
}
// check for completion?
// TODO: optimize
bool test_all { true } ;
for ( size_t i = 0 ; i < remote_have_peer . have . size_bits ( ) ; i + + ) {
if ( ! remote_have_peer . have [ i ] ) {
test_all = false ;
break ;
}
}
if ( test_all ) {
// optimize
remote_have_peer . have_all = true ;
remote_have_peer . have = BitSet { } ;
}
}
return true ;
2024-06-23 15:12:31 +02:00
}
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCEXT_ft1_bitset & e ) {
2024-07-10 12:27:19 +02:00
std : : cerr < < " SHA1_NGCFT1: got FT1_BITSET o: " < < e . start_chunk < < " s: " < < e . chunk_bitset . size ( ) * 8 < < " \n " ;
2024-06-25 21:09:46 +02:00
if ( e . file_kind ! = static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_INFO ) ) {
return false ;
}
if ( e . chunk_bitset . empty ( ) ) {
// what
return false ;
}
SHA1Digest info_hash { e . file_id } ;
auto itc_it = _info_to_content . find ( info_hash ) ;
if ( itc_it = = _info_to_content . end ( ) ) {
// we are not interested and dont track this
return false ;
}
2024-06-28 21:10:01 +02:00
auto o = itc_it - > second ;
2024-06-25 21:09:46 +02:00
2024-06-28 21:10:01 +02:00
if ( ! static_cast < bool > ( o ) ) {
2024-06-25 21:09:46 +02:00
std : : cerr < < " SHA1_NGCFT1 error: tracking info has null object \n " ;
return false ;
}
2024-06-28 21:10:01 +02:00
const size_t num_total_chunks = o . get < Components : : FT1InfoSHA1 > ( ) . chunks . size ( ) ;
2024-07-10 12:27:19 +02:00
// +7 for byte rounding
if ( num_total_chunks + 7 < e . start_chunk + ( e . chunk_bitset . size ( ) * 8 ) ) {
2024-06-25 21:09:46 +02:00
std : : cerr < < " SHA1_NGCFT1 error: got bitset.size+start that is larger then number of chunks!! \n " ;
2024-07-10 12:27:19 +02:00
std : : cerr < < " total: " < < num_total_chunks < < " start: " < < e . start_chunk < < " size: " < < e . chunk_bitset . size ( ) * 8 < < " \n " ;
2024-06-25 21:09:46 +02:00
return false ;
}
const auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
2024-07-09 14:45:00 +02:00
assert ( static_cast < bool > ( c ) ) ;
2024-07-09 11:00:59 +02:00
_tox_peer_to_contact [ combine_ids ( e . group_number , e . peer_number ) ] = c ; // workaround
2024-06-25 21:09:46 +02:00
2024-06-28 15:13:17 +02:00
// we might not know yet
2024-07-09 14:45:00 +02:00
addParticipation ( c , o ) ;
2024-06-28 15:13:17 +02:00
2024-06-28 21:10:01 +02:00
auto & remote_have = o . get_or_emplace < Components : : RemoteHave > ( ) . others ;
2024-06-25 21:09:46 +02:00
if ( ! remote_have . contains ( c ) ) {
// init
remote_have . emplace ( c , Components : : RemoteHave : : Entry { false , num_total_chunks } ) ;
}
auto & remote_have_peer = remote_have . at ( c ) ;
if ( ! remote_have_peer . have_all ) { // TODO: maybe unset with bitset?
BitSet event_bitset { e . chunk_bitset } ;
remote_have_peer . have . merge ( event_bitset , e . start_chunk ) ;
// check for completion?
// TODO: optimize
bool test_all { true } ;
for ( size_t i = 0 ; i < remote_have_peer . have . size_bits ( ) ; i + + ) {
if ( ! remote_have_peer . have [ i ] ) {
test_all = false ;
break ;
}
}
if ( test_all ) {
// optimize
remote_have_peer . have_all = true ;
remote_have_peer . have = BitSet { } ;
}
}
2024-07-09 14:45:00 +02:00
// new have? nice
// (always update on biset, not always on have)
c . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
2024-06-25 21:09:46 +02:00
return true ;
2024-06-23 15:12:31 +02:00
}
2024-06-22 17:01:52 +02:00
bool SHA1_NGCFT1 : : onEvent ( const Events : : NGCEXT_pc1_announce & e ) {
2024-06-23 10:17:48 +02:00
std : : cerr < < " SHA1_NGCFT1: PC1_ANNOUNCE s: " < < e . id . size ( ) < < " \n " ;
2024-06-22 17:01:52 +02:00
// id is file_kind + id
uint32_t file_kind = 0u ;
static_assert ( SHA1Digest { } . size ( ) = = 20 ) ;
if ( e . id . size ( ) ! = sizeof ( file_kind ) + 20 ) {
// not for us
return false ;
}
for ( size_t i = 0 ; i < sizeof ( file_kind ) ; i + + ) {
file_kind | = uint32_t ( e . id [ i ] ) < < ( i * 8 ) ;
}
2024-06-25 21:09:46 +02:00
if ( file_kind ! = static_cast < uint32_t > ( NGCFT1_file_kind : : HASH_SHA1_INFO ) ) {
return false ;
}
2024-07-09 11:00:59 +02:00
2024-06-22 17:01:52 +02:00
SHA1Digest hash { e . id . data ( ) + sizeof ( file_kind ) , 20 } ;
// if have use hash(-info) for file, add to participants
std : : cout < < " SHA1_NGCFT1: got ParticipationChatter1 announce from " < < e . group_number < < " : " < < e . peer_number < < " for " < < hash < < " \n " ;
auto itc_it = _info_to_content . find ( hash ) ;
if ( itc_it = = _info_to_content . end ( ) ) {
// we are not interested and dont track this
return false ;
}
2024-07-08 18:12:47 +02:00
// add to participants
2024-06-22 17:01:52 +02:00
const auto c = _tcm . getContactGroupPeer ( e . group_number , e . peer_number ) ;
2024-07-09 11:00:59 +02:00
_tox_peer_to_contact [ combine_ids ( e . group_number , e . peer_number ) ] = c ; // workaround
2024-06-28 21:10:01 +02:00
auto o = itc_it - > second ;
2024-07-08 18:12:47 +02:00
if ( addParticipation ( c , o ) ) {
// something happend, update chunk picker
2024-07-10 12:27:19 +02:00
// !!! this is probably too much
2024-07-08 18:12:47 +02:00
assert ( static_cast < bool > ( c ) ) ;
c . emplace_or_replace < ChunkPickerUpdateTag > ( ) ;
2024-06-22 17:01:52 +02:00
std : : cout < < " SHA1_NGCFT1: and we where interested! \n " ;
2024-06-23 15:12:31 +02:00
// we should probably send the bitset back here / add to queue (can be multiple packets)
2024-07-10 12:27:19 +02:00
if ( o . all_of < Components : : FT1ChunkSHA1Cache > ( ) & & o . get < Components : : FT1ChunkSHA1Cache > ( ) . have_count > 0 ) {
queueBitsetSendFull ( c , o ) ;
}
2024-06-22 17:01:52 +02:00
}
return false ;
}