very hacky async hashing and message creation

This commit is contained in:
Green Sky 2023-08-21 00:01:14 +02:00
parent ab3c282060
commit bb3f907cd8
No known key found for this signature in database
2 changed files with 268 additions and 198 deletions

View File

@ -1,5 +1,6 @@
#include "./sha1_ngcft1.hpp" #include "./sha1_ngcft1.hpp"
#include <mutex>
#include <solanaceae/toxcore/utils.hpp> #include <solanaceae/toxcore/utils.hpp>
#include <solanaceae/contact/components.hpp> #include <solanaceae/contact/components.hpp>
@ -20,6 +21,7 @@
#include <iostream> #include <iostream>
#include <variant> #include <variant>
#include <filesystem> #include <filesystem>
#include <future>
namespace Message::Components { namespace Message::Components {
@ -245,6 +247,18 @@ SHA1_NGCFT1::SHA1_NGCFT1(
} }
void SHA1_NGCFT1::iterate(float delta) { void SHA1_NGCFT1::iterate(float delta) {
// info builder queue
if (_info_builder_dirty) {
std::lock_guard l{_info_builder_queue_mutex};
_info_builder_dirty = false; // set while holding lock
for (auto& it : _info_builder_queue) {
//it.fn();
it();
}
_info_builder_queue.clear();
}
{ // timers { // timers
// sending transfers // sending transfers
for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) { for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) {
@ -556,7 +570,7 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
cc.chunk_hash_to_index[info.chunks[i]].push_back(i); cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
} }
if (cc.have_count == info.chunks.size()) { if (cc.have_count >= info.chunks.size()) {
cc.have_all = true; cc.have_all = true;
} }
} else { } else {
@ -1095,22 +1109,39 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
return false; return false;
} }
// TODO: rw?
// TODO: memory mapped would be king
auto file_impl = std::make_unique<FileRFile>(file_path);
if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path << "'!\n";
return true;
}
// get current time unix epoch utc // get current time unix epoch utc
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
std::thread(std::move([
// copy everything
self = this,
ts,
c,
reg_ptr,
file_name_ = std::string(file_name),
file_path_ = std::string(file_path)
]() mutable {
// TODO: rw?
// TODO: memory mapped would be king
auto file_impl = std::make_unique<FileRFile>(file_path_);
if (!file_impl->isGood()) {
{
std::lock_guard l{self->_info_builder_queue_mutex};
self->_info_builder_queue.push_back([file_path_](){
// back on iterate thread
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
});
self->_info_builder_dirty = true; // still in scope, set before mutex unlock
}
return;
}
// 1. build info by hashing all chunks // 1. build info by hashing all chunks
FT1InfoSHA1 sha1_info; FT1InfoSHA1 sha1_info;
// build info // build info
sha1_info.file_name = file_name; sha1_info.file_name = file_name_;
sha1_info.file_size = file_impl->_file_size; sha1_info.file_size = file_impl->_file_size;
{ // build chunks { // build chunks
@ -1127,6 +1158,27 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
} }
} }
file_impl.reset();
{
std::lock_guard l{self->_info_builder_queue_mutex};
self->_info_builder_queue.push_back(std::move([
self,
ts,
c,
reg_ptr,
file_name_,
file_path_,
sha1_info = std::move(sha1_info)
]() mutable { //
// back on iterate thread
auto file_impl = std::make_unique<FileRFile>(file_path_);
if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
return;
}
// 2. hash info // 2. hash info
std::vector<uint8_t> sha1_info_data; std::vector<uint8_t> sha1_info_data;
std::vector<uint8_t> sha1_info_hash; std::vector<uint8_t> sha1_info_hash;
@ -1139,8 +1191,8 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// check if content exists // check if content exists
ContentHandle ce; ContentHandle ce;
if (_info_to_content.count(sha1_info_hash)) { if (self->_info_to_content.count(sha1_info_hash)) {
ce = _info_to_content.at(sha1_info_hash); ce = self->_info_to_content.at(sha1_info_hash);
// TODO: check if content is incomplete and use file instead // TODO: check if content is incomplete and use file instead
if (!ce.all_of<Components::FT1InfoSHA1>()) { if (!ce.all_of<Components::FT1InfoSHA1>()) {
@ -1160,10 +1212,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
//cc.have_chunk //cc.have_chunk
cc.have_count = sha1_info.chunks.size(); // need? cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce; self->_info_to_content[sha1_info_hash] = ce;
cc.chunk_hash_to_index.clear(); // for cpy pst cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) { for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
_chunks[sha1_info.chunks[i]] = ce; self->_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
} }
} }
@ -1171,10 +1223,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
{ // file info { // file info
// TODO: not overwrite fi? since same? // TODO: not overwrite fi? since same?
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>(); auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
file_info.total_size = file_impl->_file_size; file_info.total_size = file_impl->_file_size;
ce.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path}}); ce.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
} }
// cleanup file // cleanup file
@ -1192,23 +1244,23 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// we dont want the info anymore // we dont want the info anymore
ce.remove<Components::ReRequestInfoTimer>(); ce.remove<Components::ReRequestInfoTimer>();
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) { if (auto it = std::find(self->_queue_content_want_info.begin(), self->_queue_content_want_info.end(), ce); it != self->_queue_content_want_info.end()) {
_queue_content_want_info.erase(it); self->_queue_content_want_info.erase(it);
} }
// TODO: we dont want chunks anymore // TODO: we dont want chunks anymore
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
auto it = _queue_content_want_chunk.begin(); auto it = self->_queue_content_want_chunk.begin();
while ( while (
it != _queue_content_want_chunk.end() && it != self->_queue_content_want_chunk.end() &&
(it = std::find(it, _queue_content_want_chunk.end(), ce)) != _queue_content_want_chunk.end() (it = std::find(it, self->_queue_content_want_chunk.end(), ce)) != self->_queue_content_want_chunk.end()
) { ) {
it = _queue_content_want_chunk.erase(it); it = self->_queue_content_want_chunk.erase(it);
} }
} else { } else {
ce = {_contentr, _contentr.create()}; ce = {self->_contentr, self->_contentr.create()};
_info_to_content[sha1_info_hash] = ce; self->_info_to_content[sha1_info_hash] = ce;
ce.emplace<Components::FT1InfoSHA1>(sha1_info); ce.emplace<Components::FT1InfoSHA1>(sha1_info);
ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file? ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
@ -1220,10 +1272,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
//cc.have_chunk //cc.have_chunk
cc.have_count = sha1_info.chunks.size(); // need? cc.have_count = sha1_info.chunks.size(); // need?
_info_to_content[sha1_info_hash] = ce; self->_info_to_content[sha1_info_hash] = ce;
cc.chunk_hash_to_index.clear(); // for cpy pst cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) { for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
_chunks[sha1_info.chunks[i]] = ce; self->_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
} }
} }
@ -1231,10 +1283,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
{ // file info { // file info
auto& file_info = ce.emplace<Message::Components::Transfer::FileInfo>(); auto& file_info = ce.emplace<Message::Components::Transfer::FileInfo>();
//const auto& file = ce.get<Message::Components::Transfer::File>(); //const auto& file = ce.get<Message::Components::Transfer::File>();
file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
file_info.total_size = file_impl->_file_size; file_info.total_size = file_impl->_file_size;
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path}}); ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
} }
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl)); ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
@ -1242,10 +1294,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
ce.emplace<Message::Components::Transfer::BytesSent>(0u); ce.emplace<Message::Components::Transfer::BytesSent>(0u);
} }
const auto c_self = _cr.get<Contact::Components::Self>(c).self; const auto c_self = self->_cr.get<Contact::Components::Self>(c).self;
if (!_cr.valid(c_self)) { if (!self->_cr.valid(c_self)) {
std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; std::cerr << "SHA1_NGCFT1 error: failed to get self!\n";
return true; return;
} }
const auto msg_e = reg_ptr->create(); const auto msg_e = reg_ptr->create();
@ -1277,12 +1329,12 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
// TODO: determine if this is true // TODO: determine if this is true
//reg_ptr->emplace<Message::Components::Transfer::TagPaused>(e); //reg_ptr->emplace<Message::Components::Transfer::TagPaused>(e);
if (_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) { if (self->_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
const uint32_t group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number; const uint32_t group_number = self->_cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
uint32_t message_id = 0; uint32_t message_id = 0;
// TODO: check return // TODO: check return
_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size());
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id); reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
// TODO: generalize? // TODO: generalize?
@ -1290,7 +1342,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
synced_by.emplace(c_self); synced_by.emplace(c_self);
} else if ( } else if (
// non online group // non online group
_cr.any_of<Contact::Components::ToxGroupPersistent>(c) self->_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
) { ) {
// create msg_id // create msg_id
const uint32_t message_id = randombytes_random(); const uint32_t message_id = randombytes_random();
@ -1301,10 +1353,15 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
synced_by.emplace(c_self); synced_by.emplace(c_self);
} }
_rmm.throwEventConstruct(*reg_ptr, msg_e); self->_rmm.throwEventConstruct(*reg_ptr, msg_e);
// TODO: place in iterate? // TODO: place in iterate?
updateMessages(ce); self->updateMessages(ce);
}));
self->_info_builder_dirty = true; // still in scope, set before mutex unlock
}
})).detach();
return true; return true;
} }

View File

@ -16,6 +16,9 @@
#include <variant> #include <variant>
#include <random> #include <random>
#include <atomic>
#include <mutex>
#include <list>
enum class Content : uint32_t {}; enum class Content : uint32_t {};
using ContentRegistry = entt::basic_registry<Content>; using ContentRegistry = entt::basic_registry<Content>;
@ -93,6 +96,16 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
std::deque<ContentHandle> _queue_content_want_info; std::deque<ContentHandle> _queue_content_want_info;
std::deque<ContentHandle> _queue_content_want_chunk; std::deque<ContentHandle> _queue_content_want_chunk;
std::atomic_bool _info_builder_dirty {false};
std::mutex _info_builder_queue_mutex;
//struct InfoBuilderEntry {
//// called on completion on the iterate thread
//// (owning)
//std::function<void(void)> fn;
//};
using InfoBuilderEntry = std::function<void(void)>;
std::list<InfoBuilderEntry> _info_builder_queue;
static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number); static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number);
void updateMessages(ContentHandle ce); void updateMessages(ContentHandle ce);
@ -105,7 +118,7 @@ class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI {
size_t _max_concurrent_in {4}; size_t _max_concurrent_in {4};
size_t _max_concurrent_out {6}; size_t _max_concurrent_out {6};
// TODO: probably also includes running transfers rn (meh) // TODO: probably also includes running transfers rn (meh)
size_t _max_pending_requests {16}; // per content size_t _max_pending_requests {32}; // per content
public: public:
SHA1_NGCFT1( SHA1_NGCFT1(