2023-08-08 20:13:10 +02:00
#include "./sha1_ngcft1.hpp"
#include <solanaceae/toxcore/utils.hpp>
2023-08-08 23:55:12 +02:00
#include <solanaceae/contact/components.hpp>
#include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/message3/components.hpp>
2023-08-10 13:01:57 +02:00
#include <solanaceae/tox_messages/components.hpp>
2023-08-08 23:55:12 +02:00
#include <solanaceae/message3/file_r_file.hpp>
2023-08-17 22:43:16 +02:00
#include <solanaceae/message3/file_rw_file.hpp>
2023-08-08 23:55:12 +02:00
#include "./ft1_sha1_info.hpp"
#include "./hash_utils.hpp"
2023-08-10 13:01:57 +02:00
#include <sodium.h>
2023-08-14 17:08:27 +02:00
#include <entt/container/dense_set.hpp>
2023-08-08 20:13:10 +02:00
#include <iostream>
2023-08-09 00:25:08 +02:00
#include <variant>
2023-08-17 22:43:16 +02:00
#include <filesystem>
2023-08-08 20:13:10 +02:00
2023-08-13 17:12:46 +02:00
namespace Message::Components {
using Content = ContentHandle;
} // Message::Components
// TODO: rename to content components
2023-08-08 23:55:12 +02:00
namespace Components {
2023-08-13 17:12:46 +02:00
struct Messages {
std::vector<Message3Handle> messages;
2023-08-08 23:55:12 +02:00
using FT1InfoSHA1 = FT1InfoSHA1;
struct FT1InfoSHA1Data {
std::vector<uint8_t> data;
struct FT1InfoSHA1Hash {
std::vector<uint8_t> hash;
2023-08-09 23:02:29 +02:00
struct FT1ChunkSHA1Cache {
std::vector<bool> have_chunk;
bool have_all {false};
size_t have_count {0};
entt::dense_map<SHA1Digest, size_t> chunk_hash_to_index;
std::optional<size_t> chunkIndex(const SHA1Digest& hash) const;
bool haveChunk(const SHA1Digest& hash) const;
2023-08-14 17:08:27 +02:00
struct SuspectedParticipants {
entt::dense_set<Contact3> participants;
struct ReRequestInfoTimer {
float timer {0.f};
2023-08-17 22:43:16 +02:00
struct ReadHeadHint {
// points to the first byte we want
// this is just a hint, that can be set from outside
// to guide the sequential "piece picker" strategy
// the strategy *should* set this to the first byte we dont yet have
uint64_t offset_into_file {0u};
2023-08-08 23:55:12 +02:00
} // Components
2023-08-09 23:02:29 +02:00
std::optional<size_t> Components::FT1ChunkSHA1Cache::chunkIndex(const SHA1Digest& hash) const {
const auto it = chunk_hash_to_index.find(hash);
if (it != chunk_hash_to_index.cend()) {
return it->second;
} else {
return std::nullopt;
bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const {
if (have_all) { // short cut
return true;
if (auto i_opt = chunkIndex(hash); i_opt.has_value()) {
return have_chunk[i_opt.value()];
// not part of this file
return false;
static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
if (chunk_index+1 == sha1_info.chunks.size()) {
// last chunk
return sha1_info.file_size - chunk_index * sha1_info.chunk_size;
} else {
return sha1_info.chunk_size;
2023-08-13 17:12:46 +02:00
void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash) {
2023-08-09 23:02:29 +02:00
// TODO: transfers
for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) {
// if already in queue
if (i_g == group_number && i_p == peer_number && i_h == hash) {
// update timer
i_t = 0.f;
// not in queue yet
2023-08-13 17:12:46 +02:00
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f));
2023-08-09 23:02:29 +02:00
2023-08-09 00:25:08 +02:00
uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t peer_number) {
return (uint64_t(group_number) << 32) | peer_number;
2023-08-13 17:12:46 +02:00
void SHA1_NGCFT1::updateMessages(ContentHandle ce) {
for (auto msg : ce.get<Components::Messages>().messages) {
if (ce.all_of<Message::Components::Transfer::FileInfo>() && !msg.all_of<Message::Components::Transfer::FileInfo>()) {
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) {
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
2023-08-15 16:06:54 +02:00
if (ce.all_of<Message::Components::Transfer::TagPaused>()) {
} else {
2023-08-13 17:12:46 +02:00
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
2023-08-16 01:43:12 +02:00
std::optional<std::pair<uint32_t, uint32_t>> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) {
// get a list of peers we can request this file from
// TODO: randomly request from non SuspectedParticipants
std::vector<std::pair<uint32_t, uint32_t>> tox_peers;
for (const auto c : ce.get<Components::SuspectedParticipants>().participants) {
// TODO: sort by con state?
// prio to direct?
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(c);
tox_peers.push_back({tgpe.group_number, tgpe.peer_number});
// 1 in 20 chance to ask random peer instead
// TODO: config + tweak
// TODO: save group in content to avoid the tox_peers list build
if (tox_peers.empty() || (_rng()%20) == 0) {
// meh
// HACK: determain group based on last tox_peers
if (!tox_peers.empty()) {
const uint32_t group_number = tox_peers.back().first;
auto gch = _tcm.getContactGroup(group_number);
std::vector<uint32_t> un_tox_peers;
for (const auto child : gch.get<Contact::Components::ParentOf>().subs) {
if (const auto* cs = _cr.try_get<Contact::Components::ConnectionState>(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) {
if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(child)) {
const auto& tgpe = _cr.get<Contact::Components::ToxGroupPeerEphemeral>(child);
if (un_tox_peers.empty()) {
// no one online, we are out of luck
} else {
const size_t sample_i = _rng()%un_tox_peers.size();
const auto peer_number = un_tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
} else {
const size_t sample_i = _rng()%tox_peers.size();
const auto [group_number, peer_number] = tox_peers.at(sample_i);
return std::make_pair(group_number, peer_number);
return std::nullopt;
2023-08-08 20:13:10 +02:00
2023-08-08 23:55:12 +02:00
Contact3Registry& cr,
2023-08-08 20:13:10 +02:00
RegistryMessageModel& rmm,
2023-08-08 23:55:12 +02:00
NGCFT1& nft,
2023-08-08 20:13:10 +02:00
ToxContactModel2& tcm
) :
2023-08-08 23:55:12 +02:00
2023-08-08 20:13:10 +02:00
2023-08-08 23:55:12 +02:00
2023-08-08 20:13:10 +02:00
2023-08-17 22:43:16 +02:00
// TODO: also create and destroy
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
2023-08-08 23:55:12 +02:00
_nft.subscribe(this, NGCFT1_Event::recv_request);
_nft.subscribe(this, NGCFT1_Event::recv_init);
_nft.subscribe(this, NGCFT1_Event::recv_data);
_nft.subscribe(this, NGCFT1_Event::send_data);
2023-08-09 23:49:28 +02:00
_nft.subscribe(this, NGCFT1_Event::recv_done);
_nft.subscribe(this, NGCFT1_Event::send_done);
2023-08-11 12:03:01 +02:00
_nft.subscribe(this, NGCFT1_Event::recv_message);
2023-08-08 23:55:12 +02:00
//_rmm.subscribe(this, RegistryMessageModel_Event::message_construct);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy);
_rmm.subscribe(this, RegistryMessageModel_Event::send_file_path);
2023-08-08 20:13:10 +02:00
2023-08-09 23:02:29 +02:00
void SHA1_NGCFT1::iterate(float delta) {
{ // timers
2023-08-10 13:01:57 +02:00
// sending transfers
2023-08-09 23:02:29 +02:00
for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 10sec, timeout
if (it->second.time_since_activity >= 10.f) {
2023-08-14 18:33:17 +02:00
std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n";
2023-08-09 23:02:29 +02:00
it = peer_it->second.erase(it);
} else {
if (peer_it->second.empty()) {
// cleanup unused peers too agressive?
peer_it = _sending_transfers.erase(peer_it);
} else {
2023-08-15 16:06:54 +02:00
// receiving transfers
for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) {
for (auto it = peer_it->second.begin(); it != peer_it->second.end();) {
it->second.time_since_activity += delta;
// if we have not heard for 10sec, timeout
if (it->second.time_since_activity >= 10.f) {
std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n";
// TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading
it = peer_it->second.erase(it);
} else {
if (peer_it->second.empty()) {
// cleanup unused peers too agressive?
peer_it = _receiving_transfers.erase(peer_it);
} else {
2023-08-09 23:02:29 +02:00
// queued requests
for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) {
float& timer = std::get<float>(*it);
timer += delta;
if (timer >= 10.f) {
it = _queue_requested_chunk.erase(it);
} else {
2023-08-14 17:08:27 +02:00
{ // requested info timers
std::vector<Content> timed_out;
_contentr.view<Components::ReRequestInfoTimer>().each([delta, &timed_out](Content e, Components::ReRequestInfoTimer& rrit) {
rrit.timer += delta;
// 15sec, TODO: config
if (rrit.timer >= 15.f) {
for (const auto e : timed_out) {
// TODO: avoid dups
_queue_content_want_info.push_back({_contentr, e});
2023-08-09 23:02:29 +02:00
// if we have not reached the total cap for transfers
// count running transfers
2023-08-14 17:08:27 +02:00
size_t running_sending_transfer_count {0};
2023-08-09 23:02:29 +02:00
for (const auto& [_, transfers] : _sending_transfers) {
2023-08-14 17:08:27 +02:00
running_sending_transfer_count += transfers.size();
size_t running_receiving_transfer_count {0};
for (const auto& [_, transfers] : _receiving_transfers) {
running_receiving_transfer_count += transfers.size();
2023-08-09 23:02:29 +02:00
2023-08-14 17:08:27 +02:00
if (running_sending_transfer_count < _max_concurrent_out) {
// TODO: for each peer? transfer cap per peer?
// TODO: info queue
2023-08-09 23:02:29 +02:00
if (!_queue_requested_chunk.empty()) { // then check for chunk requests
2023-08-14 17:08:27 +02:00
const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front();
2023-08-09 23:02:29 +02:00
2023-08-14 17:08:27 +02:00
auto chunk_idx_opt = ce.get<Components::FT1ChunkSHA1Cache>().chunkIndex(chunk_hash);
2023-08-09 23:02:29 +02:00
if (chunk_idx_opt.has_value()) {
2023-08-14 17:08:27 +02:00
const auto& info = ce.get<Components::FT1InfoSHA1>();
2023-08-09 23:02:29 +02:00
uint8_t transfer_id {0};
if (_nft.NGC_FT1_send_init_private(
group_number, peer_number,
chunk_hash.data.data(), chunk_hash.size(),
chunkSize(info, chunk_idx_opt.value()),
)) {
[combineIds(group_number, peer_number)]
[transfer_id] // TODO: also save index?
2023-08-14 17:08:27 +02:00
.v = SendingTransfer::Chunk{ce, chunk_idx_opt.value() * info.chunk_size};
2023-08-09 23:02:29 +02:00
// remove from queue regardless
2023-08-14 17:08:27 +02:00
if (running_receiving_transfer_count < _max_concurrent_in) {
// strictly priorize info
if (!_queue_content_want_info.empty()) {
const auto ce = _queue_content_want_info.front();
// make sure we are missing the info
2023-08-16 01:43:12 +02:00
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
2023-08-14 17:08:27 +02:00
//const auto& info = msg.get<Components::FT1InfoSHA1>();
const auto& info_hash = ce.get<Components::FT1InfoSHA1Hash>().hash;
group_number, peer_number,
info_hash.data(), info_hash.size()
std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n";
} else if (!_queue_content_want_chunk.empty()) {
2023-08-16 01:43:12 +02:00
const auto ce = _queue_content_want_chunk.front();
// select chunk/make sure we still need one
auto selected_peer_opt = selectPeerForRequest(ce);
if (selected_peer_opt.has_value()) {
const auto [group_number, peer_number] = selected_peer_opt.value();
2023-08-17 22:43:16 +02:00
std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n";
2023-08-16 01:43:12 +02:00
// ...
2023-08-14 17:08:27 +02:00
2023-08-09 23:02:29 +02:00
2023-08-17 22:43:16 +02:00
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// see tox_transfer_manager.cpp for reference
if (!e.e.all_of<Message::Components::Transfer::ActionAccept, Message::Components::Content>()) {
return false;
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
auto ce = e.e.get<Message::Components::Content>();
//if (!ce.all_of<Components::FT1InfoSHA1, Components::FT1ChunkSHA1Cache>()) {
if (!ce.all_of<Components::FT1InfoSHA1>()) {
// not ready to load yet, skip
return false;
// first, open file for write(+readback)
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path};
// TODO: replace with filesystem or something
// TODO: ensure dir exists
if (full_file_path.back() != '/') {
full_file_path += "/";
const auto& info = ce.get<Components::FT1InfoSHA1>();
full_file_path += info.file_name;
// HACK truncate
auto file_impl = std::make_unique<FileRWFile>(full_file_path, info.file_size, true);
if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
//e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop
return false;
// next, create chuck cache and check for existing data
// now, enque
// should?
return false;
2023-08-08 20:13:10 +02:00
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
// only interested in sha1
if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) {
return false;
//std::cout << "SHA1_NGCFT1: FT1_REQUEST fk:" << int(e.file_kind) << " [" << bin2hex({e.file_id, e.file_id+e.file_id_size}) << "]\n";
2023-08-08 23:55:12 +02:00
if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) {
if (e.file_id_size != 20) {
// error
return false;
SHA1Digest info_hash{e.file_id, e.file_id_size};
2023-08-13 17:12:46 +02:00
if (!_info_to_content.count(info_hash)) {
2023-08-08 23:55:12 +02:00
// we dont know about this
return false;
2023-08-13 17:12:46 +02:00
auto content = _info_to_content.at(info_hash);
2023-08-08 23:55:12 +02:00
2023-08-13 17:12:46 +02:00
if (!content.all_of<Components::FT1InfoSHA1Data>()) {
// we dont have the info for that infohash (yet?)
return false;
2023-08-08 23:55:12 +02:00
2023-08-09 23:02:29 +02:00
// TODO: queue instead
//queueUpRequestInfo(e.group_number, e.peer_number, info_hash);
2023-08-09 00:25:08 +02:00
uint8_t transfer_id {0};
2023-08-08 23:55:12 +02:00
e.group_number, e.peer_number,
e.file_id, e.file_id_size,
2023-08-13 17:12:46 +02:00
2023-08-09 00:25:08 +02:00
2023-08-08 23:55:12 +02:00
2023-08-09 00:25:08 +02:00
[combineIds(e.group_number, e.peer_number)]
2023-08-13 17:12:46 +02:00
.v = SendingTransfer::Info{content.get<Components::FT1InfoSHA1Data>().data};
2023-08-09 23:02:29 +02:00
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
if (e.file_id_size != 20) {
// error
return false;
SHA1Digest chunk_hash{e.file_id, e.file_id_size};
if (!_chunks.count(chunk_hash)) {
// we dont know about this
return false;
2023-08-14 17:08:27 +02:00
auto ce = _chunks.at(chunk_hash);
{ // they advertise interest in the content
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
2023-08-09 23:02:29 +02:00
2023-08-16 01:43:12 +02:00
2023-08-09 23:02:29 +02:00
2023-08-14 17:08:27 +02:00
if (!ce.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) {
2023-08-09 23:02:29 +02:00
// we dont have the chunk
return false;
// queue good request
2023-08-14 17:08:27 +02:00
queueUpRequestChunk(e.group_number, e.peer_number, ce, chunk_hash);
2023-08-09 23:02:29 +02:00
} else {
assert(false && "unhandled case");
2023-08-08 23:55:12 +02:00
2023-08-09 23:02:29 +02:00
return true;
2023-08-08 20:13:10 +02:00
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
// only interested in sha1
if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) {
return false;
2023-08-15 16:06:54 +02:00
// TODO: make sure we requested this?
if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) {
SHA1Digest sha1_info_hash {e.file_id, e.file_id_size};
if (!_info_to_content.count(sha1_info_hash)) {
// no idea about this content
return false;
auto ce = _info_to_content.at(sha1_info_hash);
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data, Components::FT1ChunkSHA1Cache>()) {
// we already have the info (should)
return false;
// TODO: check if e.file_size too large / ask for permission
if (e.file_size > 100*1024*1024) {
// a info size of 100MiB is ~640GiB for a 128KiB chunk size (default)
return false;
[combineIds(e.group_number, e.peer_number)]
.v = ReceivingTransfer::Info{ce, std::vector<uint8_t>(e.file_size)};
e.accept = true;
2023-08-16 01:43:12 +02:00
} else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) {
2023-08-15 16:06:54 +02:00
} else {
2023-08-16 01:43:12 +02:00
assert(false && "unhandled case");
2023-08-15 16:06:54 +02:00
return true;
2023-08-08 20:13:10 +02:00
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
2023-08-15 16:06:54 +02:00
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
return false;
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
if (!peer_transfers.count(e.transfer_id)) {
return false;
auto& tv = peer_transfers[e.transfer_id].v;
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
auto& info_data = std::get<ReceivingTransfer::Info>(tv).info_data;
for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) {
info_data[i+e.data_offset] = e.data[i];
2023-08-16 01:43:12 +02:00
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
const auto offset_into_file = std::get<ReceivingTransfer::Chunk>(tv).offset_into_file;
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
auto* file = ce.get<Message::Components::Transfer::File>().get();
assert(file != nullptr);
// TODO: avoid temporary copy
// TODO: check return
file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size});
2023-08-15 16:06:54 +02:00
} else {
2023-08-16 01:43:12 +02:00
assert(false && "unhandled case");
2023-08-15 16:06:54 +02:00
return true;
2023-08-08 20:13:10 +02:00
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
2023-08-09 00:25:08 +02:00
if (!_sending_transfers.count(combineIds(e.group_number, e.peer_number))) {
return false;
auto& peer = _sending_transfers.at(combineIds(e.group_number, e.peer_number));
if (!peer.count(e.transfer_id)) {
return false;
auto& transfer = peer.at(e.transfer_id);
if (std::holds_alternative<SendingTransfer::Info>(transfer.v)) {
auto& info_transfer = std::get<SendingTransfer::Info>(transfer.v);
for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) {
e.data[i] = info_transfer.info_data[i + e.data_offset];
if (e.data_offset + e.data_size >= info_transfer.info_data.size()) {
// was last read (probably TODO: add transfer destruction event)
2023-08-09 23:02:29 +02:00
} else if (std::holds_alternative<SendingTransfer::Chunk>(transfer.v)) {
auto& chunk_transfer = std::get<SendingTransfer::Chunk>(transfer.v);
2023-08-13 17:12:46 +02:00
// TODO: should we really use file?
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read(chunk_transfer.offset_into_file + e.data_offset, e.data_size);
2023-08-09 23:02:29 +02:00
// TODO: optimize
for (size_t i = 0; i < e.data_size && i < data.size(); i++) {
e.data[i] = data[i];
2023-08-13 17:12:46 +02:00
chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size();
// TODO: add event to propergate to messages
2023-08-09 23:02:29 +02:00
//_rmm.throwEventUpdate(transfer); // should we?
//if (e.data_offset + e.data_size >= *insert chunk size here*) {
//// was last read (probably TODO: add transfer destruction event)
} else {
assert(false && "not implemented?");
2023-08-09 00:25:08 +02:00
2023-08-09 23:02:29 +02:00
transfer.time_since_activity = 0.f;
2023-08-09 00:25:08 +02:00
return true;
2023-08-08 20:13:10 +02:00
2023-08-09 23:49:28 +02:00
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
2023-08-15 16:06:54 +02:00
if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) {
return false;
auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number));
if (!peer_transfers.count(e.transfer_id)) {
return false;
const auto& tv = peer_transfers[e.transfer_id].v;
if (std::holds_alternative<ReceivingTransfer::Info>(tv)) {
auto& info = std::get<ReceivingTransfer::Info>(tv);
auto ce = info.content;
if (ce.any_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Data>()) {
// we already have the info, discard
return true;
// check if data matches hash
auto hash = hash_sha1(info.info_data.data(), info.info_data.size());
if (ce.get<Components::FT1InfoSHA1Hash>().hash != hash) {
std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n";
// requeue info request
return true;
const auto& info_data = ce.emplace_or_replace<Components::FT1InfoSHA1Data>(std::move(info.info_data)).data;
auto& ft_info = ce.emplace_or_replace<Components::FT1InfoSHA1>();
{ // file info
// TODO: not overwrite fi? since same?
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
file_info.total_size = ft_info.file_size;
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
} else if (std::holds_alternative<ReceivingTransfer::Chunk>(tv)) {
2023-08-16 01:43:12 +02:00
const auto offset_into_file = std::get<ReceivingTransfer::Chunk>(tv).offset_into_file;
auto ce = std::get<ReceivingTransfer::Chunk>(tv).content;
const auto& info = ce.get<Components::FT1InfoSHA1>();
auto& cc = ce.get<Components::FT1ChunkSHA1Cache>();
const auto chunk_index = offset_into_file/info.chunk_size;
assert(chunk_index < info.chunks.size());
const auto chunk_size = info.chunkSize(chunk_index);
assert(offset_into_file+chunk_size <= info.file_size);
const auto chunk_data = ce.get<Message::Components::Transfer::File>()->read(offset_into_file, chunk_size);
// check hash of chunk
auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size());
if (info.chunks.at(chunk_index) == got_hash) {
// TODO: check for have all
cc.have_chunk.at(chunk_index) = true;
// good chunk
ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size();
} else {
// bad chunk
// TODO: requeue?
updateMessages(ce); // mostly for received bytes
2023-08-15 16:06:54 +02:00
return true;
2023-08-09 23:49:28 +02:00
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) {
if (!_sending_transfers.count(combineIds(e.group_number, e.peer_number))) {
return false;
auto& peer_transfers = _sending_transfers.at(combineIds(e.group_number, e.peer_number));
if (!peer_transfers.count(e.transfer_id)) {
return false;
2023-08-14 17:08:27 +02:00
const auto& tv = peer_transfers[e.transfer_id].v;
if (std::holds_alternative<SendingTransfer::Chunk>(tv)) {
updateMessages(std::get<SendingTransfer::Chunk>(tv).content); // mostly for sent bytes
2023-08-09 23:49:28 +02:00
return true;
2023-08-11 12:03:01 +02:00
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) {
return false;
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
const auto self_c = c.get<Contact::Components::Self>().self;
auto* reg_ptr = _rmm.get(c);
if (reg_ptr == nullptr) {
std::cerr << "SHA1_NGCFT1 error: cant find reg\n";
return false;
Message3Registry& reg = *reg_ptr;
// TODO: check for existence, hs or other syncing mechanics might have sent it already (or like, it arrived 2x or whatever)
auto new_msg_e = reg.create();
{ // contact
// from
reg.emplace<Message::Components::ContactFrom>(new_msg_e, c);
// to
reg.emplace<Message::Components::ContactTo>(new_msg_e, c.get<Contact::Components::Parent>().parent);
reg.emplace<Message::Components::ToxGroupMessageID>(new_msg_e, e.message_id);
reg.emplace<Message::Components::Transfer::TagReceiving>(new_msg_e); // add sending?
reg.emplace<Message::Components::TimestampProcessed>(new_msg_e, ts);
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
reg.emplace<Message::Components::Timestamp>(new_msg_e, ts); // reactive?
{ // by whom
auto& synced_by = reg.get_or_emplace<Message::Components::SyncedBy>(new_msg_e).list;
2023-08-13 17:12:46 +02:00
// check if content exists
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
ContentHandle ce;
if (_info_to_content.count(sha1_info_hash)) {
ce = _info_to_content.at(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has existing content\n";
} else {
ce = {_contentr, _contentr.create()};
_info_to_content[sha1_info_hash] = ce;
std::cout << "SHA1_NGCFT1: new message has new content\n";
//ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
//{ // lookup tables and have
//auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
//cc.have_all = true;
//// skip have vec, since all
//cc.have_count = sha1_info.chunks.size(); // need?
//_info_to_content[sha1_info_hash] = ce;
//for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
//_chunks[sha1_info.chunks[i]] = ce;
//cc.chunk_hash_to_index[sha1_info.chunks[i]] = i;
// TODO: ft1 specific comp
ce.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
2023-08-17 22:43:16 +02:00
reg_ptr->emplace<Message::Components::Content>(new_msg_e, ce);
2023-08-13 17:12:46 +02:00
2023-08-14 17:08:27 +02:00
if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) {
// TODO: check if already receiving
2023-08-13 17:12:46 +02:00
// TODO: queue info dl
//reg_ptr->emplace<Components::FT1InfoSHA1>(e, sha1_info);
//reg_ptr->emplace<Components::FT1InfoSHA1Data>(e, sha1_info_data); // keep around? or file?
//reg.emplace<Components::FT1InfoSHA1Hash>(new_msg_e, std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size});
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(new_msg_e, ce.get<Message::Components::Transfer::FileInfo>());
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfoLocal>(new_msg_e, ce.get<Message::Components::Transfer::FileInfoLocal>());
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
reg_ptr->emplace<Message::Components::Transfer::BytesSent>(new_msg_e, ce.get<Message::Components::Transfer::BytesSent>());
2023-08-11 12:03:01 +02:00
// TODO: queue info/check if we already have info
_rmm.throwEventConstruct(reg, new_msg_e);
return true; // false?
2023-08-08 23:55:12 +02:00
bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) {
if (
// TODO: add support of offline queuing
) {
return false;
std::cout << "SHA1_NGCFT1: got sendFilePath()\n";
auto* reg_ptr = _rmm.get(c);
if (reg_ptr == nullptr) {
return false;
2023-08-13 17:12:46 +02:00
// TODO: rw?
// TODO: memory mapped would be king
2023-08-08 23:55:12 +02:00
auto file_impl = std::make_unique<FileRFile>(file_path);
if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path << "'!\n";
return true;
// get current time unix epoch utc
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
// 1. build info by hashing all chunks
FT1InfoSHA1 sha1_info;
// build info
sha1_info.file_name = file_name;
sha1_info.file_size = file_impl->_file_size;
{ // build chunks
// HACK: load file fully
// TODO: the speed is truly horrid
const auto file_data = file_impl->read(0, file_impl->_file_size);
size_t i = 0;
for (; i + sha1_info.chunk_size < file_data.size(); i += sha1_info.chunk_size) {
sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, sha1_info.chunk_size));
if (i < file_data.size()) {
sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, file_data.size()-i));
// 2. hash info
std::vector<uint8_t> sha1_info_data;
std::vector<uint8_t> sha1_info_hash;
std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info;
sha1_info_data = sha1_info.toBuffer();
std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n";
sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size());
std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n";
2023-08-13 17:12:46 +02:00
// check if content exists
ContentHandle ce;
if (_info_to_content.count(sha1_info_hash)) {
ce = _info_to_content.at(sha1_info_hash);
// TODO: check if content is incomplete and use file instead
if (!ce.all_of<Components::FT1InfoSHA1>()) {
if (!ce.all_of<Components::FT1InfoSHA1Data>()) {
// hash has to be set already
// Components::FT1InfoSHA1Hash
{ // lookup tables and have
auto& cc = ce.get_or_emplace<Components::FT1ChunkSHA1Cache>();
cc.have_all = true;
// skip have vec, since all
cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce;
2023-08-14 17:08:27 +02:00
for (size_t i = sha1_info.chunks.size(); i > 0; i--) {
_chunks[sha1_info.chunks[i-1]] = ce;
// chunks can have more then 1 index ..., for now, build reverse and have the first index be the real index
cc.chunk_hash_to_index[sha1_info.chunks[i-1]] = i-1;
2023-08-13 17:12:46 +02:00
{ // file info
// TODO: not overwrite fi? since same?
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size};
file_info.total_size = file_impl->_file_size;
// cleanup file
if (ce.all_of<Message::Components::Transfer::File>()) {
// replace
if (!ce.all_of<Message::Components::Transfer::BytesSent>()) {
2023-08-15 18:35:07 +02:00
2023-08-14 18:33:17 +02:00
// we dont want the info anymore
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) {
// TODO: we dont want chunks anymore
2023-08-13 17:12:46 +02:00
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
} else {
ce = {_contentr, _contentr.create()};
_info_to_content[sha1_info_hash] = ce;
ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
{ // lookup tables and have
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
cc.have_all = true;
// skip have vec, since all
cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce;
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]] = i;
{ // file info
auto& file_info = ce.emplace<Message::Components::Transfer::FileInfo>();
//const auto& file = ce.get<Message::Components::Transfer::File>();
file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size};
file_info.total_size = file_impl->_file_size;
2023-08-08 23:55:12 +02:00
const auto c_self = _cr.get<Contact::Components::Self>(c).self;
if (!_cr.valid(c_self)) {
std::cerr << "SHA1_NGCFT1 error: failed to get self!\n";
return true;
2023-08-13 17:12:46 +02:00
const auto msg_e = reg_ptr->create();
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
ce.get_or_emplace<Components::Messages>().messages.push_back({*reg_ptr, msg_e});
2023-08-08 23:55:12 +02:00
//reg_ptr->emplace<Message::Components::Transfer::FileKind>(e, file_kind);
// file id would be sha1_info hash or something
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
2023-08-16 01:43:12 +02:00
// remove? done in updateMessages() anyway
2023-08-13 17:12:46 +02:00
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(msg_e, ce.get<Message::Components::Transfer::FileInfo>());
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfoLocal>(msg_e, ce.get<Message::Components::Transfer::FileInfoLocal>());
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
reg_ptr->emplace<Message::Components::Transfer::BytesSent>(msg_e, ce.get<Message::Components::Transfer::BytesSent>());
2023-08-08 23:55:12 +02:00
// TODO: determine if this is true
2023-08-10 13:01:57 +02:00
if (_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
const uint32_t group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
uint32_t message_id = 0;
// TODO: check return
_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size());
2023-08-13 17:12:46 +02:00
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
2023-08-10 13:01:57 +02:00
// TODO: generalize?
2023-08-13 17:12:46 +02:00
auto& synced_by = reg_ptr->emplace<Message::Components::SyncedBy>(msg_e).list;
2023-08-10 13:01:57 +02:00
} else if (
// non online group
) {
// create msg_id
const uint32_t message_id = randombytes_random();
2023-08-13 17:12:46 +02:00
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
2023-08-10 13:01:57 +02:00
// TODO: generalize?
2023-08-13 17:12:46 +02:00
auto& synced_by = reg_ptr->emplace<Message::Components::SyncedBy>(msg_e).list;
2023-08-10 13:01:57 +02:00
2023-08-13 17:12:46 +02:00
_rmm.throwEventConstruct(*reg_ptr, msg_e);
// TODO: place in iterate?
2023-08-08 23:55:12 +02:00
return true;