Compare commits
2 Commits
f730844771
...
54ace9d0b2
Author | SHA1 | Date | |
---|---|---|---|
54ace9d0b2 | |||
e50e74e12f |
@ -50,6 +50,9 @@ add_library(solanaceae_sha1_ngcft1
|
|||||||
./solanaceae/ngc_ft1_sha1/file_constructor.hpp
|
./solanaceae/ngc_ft1_sha1/file_constructor.hpp
|
||||||
./solanaceae/ngc_ft1_sha1/file_constructor.cpp
|
./solanaceae/ngc_ft1_sha1/file_constructor.cpp
|
||||||
|
|
||||||
|
./solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp
|
||||||
|
./solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp
|
||||||
|
|
||||||
./solanaceae/ngc_ft1_sha1/hash_utils.hpp
|
./solanaceae/ngc_ft1_sha1/hash_utils.hpp
|
||||||
./solanaceae/ngc_ft1_sha1/hash_utils.cpp
|
./solanaceae/ngc_ft1_sha1/hash_utils.cpp
|
||||||
|
|
||||||
|
253
solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp
Normal file
253
solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp
Normal file
@ -0,0 +1,253 @@
|
|||||||
|
#include "./sha1_mapped_filesystem.hpp"
|
||||||
|
|
||||||
|
#include <solanaceae/object_store/meta_components.hpp>
|
||||||
|
|
||||||
|
#include "../file_constructor.hpp"
|
||||||
|
#include "../ft1_sha1_info.hpp"
|
||||||
|
#include "../hash_utils.hpp"
|
||||||
|
#include "../components.hpp"
|
||||||
|
|
||||||
|
#include <solanaceae/util/utils.hpp>
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <mutex>
|
||||||
|
#include <list>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
namespace Backends {
|
||||||
|
|
||||||
|
struct SHA1MappedFilesystem_InfoBuilderState {
|
||||||
|
std::atomic_bool info_builder_dirty {false};
|
||||||
|
std::mutex info_builder_queue_mutex;
|
||||||
|
using InfoBuilderEntry = std::function<void(void)>;
|
||||||
|
std::list<InfoBuilderEntry> info_builder_queue;
|
||||||
|
};
|
||||||
|
|
||||||
|
SHA1MappedFilesystem::SHA1MappedFilesystem(
|
||||||
|
ObjectStore2& os
|
||||||
|
) : StorageBackendI::StorageBackendI(os), _ibs(std::make_unique<SHA1MappedFilesystem_InfoBuilderState>()) {
|
||||||
|
}
|
||||||
|
|
||||||
|
SHA1MappedFilesystem::~SHA1MappedFilesystem(void) {
|
||||||
|
}
|
||||||
|
|
||||||
|
void SHA1MappedFilesystem::tick(void) {
|
||||||
|
if (_ibs->info_builder_dirty) {
|
||||||
|
std::lock_guard l{_ibs->info_builder_queue_mutex};
|
||||||
|
_ibs->info_builder_dirty = false; // set while holding lock
|
||||||
|
|
||||||
|
for (auto& it : _ibs->info_builder_queue) {
|
||||||
|
it();
|
||||||
|
}
|
||||||
|
_ibs->info_builder_queue.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectHandle SHA1MappedFilesystem::newObject(ByteSpan id) {
|
||||||
|
ObjectHandle o{_os.registry(), _os.registry().create()};
|
||||||
|
|
||||||
|
o.emplace<ObjComp::Ephemeral::Backend>(this);
|
||||||
|
o.emplace<ObjComp::ID>(std::vector<uint8_t>{id});
|
||||||
|
//o.emplace<ObjComp::Ephemeral::FilePath>(object_file_path.generic_u8string());
|
||||||
|
|
||||||
|
_os.throwEventConstruct(o);
|
||||||
|
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_view file_path, std::function<void(ObjectHandle o)>&& cb) {
|
||||||
|
std::thread(std::move([
|
||||||
|
this,
|
||||||
|
ibs = _ibs.get(),
|
||||||
|
cb = std::move(cb),
|
||||||
|
file_name_ = std::string(file_name),
|
||||||
|
file_path_ = std::string(file_path)
|
||||||
|
]() mutable {
|
||||||
|
// 0. open and fail
|
||||||
|
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, -1);
|
||||||
|
if (!file_impl->isGood()) {
|
||||||
|
{
|
||||||
|
std::lock_guard l{ibs->info_builder_queue_mutex};
|
||||||
|
ibs->info_builder_queue.push_back([file_path_](){
|
||||||
|
// back on iterate thread
|
||||||
|
|
||||||
|
std::cerr << "SHA1MF error: failed opening file '" << file_path_ << "'!\n";
|
||||||
|
});
|
||||||
|
ibs->info_builder_dirty = true; // still in scope, set before mutex unlock
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. build info by hashing all chunks
|
||||||
|
FT1InfoSHA1 sha1_info;
|
||||||
|
// build info
|
||||||
|
sha1_info.file_name = file_name_;
|
||||||
|
sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details
|
||||||
|
|
||||||
|
{ // build chunks
|
||||||
|
// HACK: load file fully
|
||||||
|
// ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway
|
||||||
|
const auto file_data = file_impl->read(file_impl->_file_size, 0);
|
||||||
|
size_t i = 0;
|
||||||
|
for (; i + sha1_info.chunk_size < file_data.size; i += sha1_info.chunk_size) {
|
||||||
|
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, sha1_info.chunk_size));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i < file_data.size) {
|
||||||
|
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, file_data.size-i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
file_impl.reset();
|
||||||
|
|
||||||
|
std::lock_guard l{ibs->info_builder_queue_mutex};
|
||||||
|
ibs->info_builder_queue.push_back(std::move([
|
||||||
|
this,
|
||||||
|
file_name_,
|
||||||
|
file_path_,
|
||||||
|
sha1_info = std::move(sha1_info),
|
||||||
|
cb = std::move(cb)
|
||||||
|
]() mutable { //
|
||||||
|
// executed on iterate thread
|
||||||
|
|
||||||
|
// reopen, cant move, since std::function needs to be copy consturctable (meh)
|
||||||
|
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, sha1_info.file_size);
|
||||||
|
if (!file_impl->isGood()) {
|
||||||
|
std::cerr << "SHA1MF error: failed opening file '" << file_path_ << "'!\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. hash info
|
||||||
|
std::vector<uint8_t> sha1_info_data;
|
||||||
|
std::vector<uint8_t> sha1_info_hash;
|
||||||
|
|
||||||
|
std::cout << "SHA1MF info is: \n" << sha1_info;
|
||||||
|
sha1_info_data = sha1_info.toBuffer();
|
||||||
|
std::cout << "SHA1MF sha1_info size: " << sha1_info_data.size() << "\n";
|
||||||
|
sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size());
|
||||||
|
std::cout << "SHA1MF sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n";
|
||||||
|
|
||||||
|
ObjectHandle o;
|
||||||
|
// check if content exists
|
||||||
|
// TODO: store "info_to_content" in reg/backend, for better lookup speed
|
||||||
|
// rn ok, bc this is rare
|
||||||
|
for (const auto& [it_ov, it_ih] : _os.registry().view<Components::FT1InfoSHA1Hash>().each()) {
|
||||||
|
if (it_ih.hash == sha1_info_hash) {
|
||||||
|
o = {_os.registry(), it_ov};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//if (self->info_to_content.count(sha1_info_hash)) {
|
||||||
|
if (static_cast<bool>(o)) {
|
||||||
|
//ce = self->info_to_content.at(sha1_info_hash);
|
||||||
|
|
||||||
|
// TODO: check if content is incomplete and use file instead
|
||||||
|
if (!o.all_of<Components::FT1InfoSHA1>()) {
|
||||||
|
o.emplace<Components::FT1InfoSHA1>(sha1_info);
|
||||||
|
}
|
||||||
|
if (!o.all_of<Components::FT1InfoSHA1Data>()) {
|
||||||
|
o.emplace<Components::FT1InfoSHA1Data>(sha1_info_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
// hash has to be set already
|
||||||
|
// Components::FT1InfoSHA1Hash
|
||||||
|
|
||||||
|
{ // lookup tables and have
|
||||||
|
auto& cc = o.get_or_emplace<Components::FT1ChunkSHA1Cache>();
|
||||||
|
cc.have_all = true;
|
||||||
|
// skip have vec, since all
|
||||||
|
//cc.have_chunk
|
||||||
|
cc.have_count = sha1_info.chunks.size(); // need?
|
||||||
|
|
||||||
|
//self->_info_to_content[sha1_info_hash] = ce;
|
||||||
|
cc.chunk_hash_to_index.clear(); // for cpy pst
|
||||||
|
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
|
||||||
|
//self->_chunks[sha1_info.chunks[i]] = ce;
|
||||||
|
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // file info
|
||||||
|
// TODO: not overwrite fi? since same?
|
||||||
|
auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>();
|
||||||
|
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
|
||||||
|
file_info.total_size = file_impl->_file_size;
|
||||||
|
|
||||||
|
o.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
|
||||||
|
}
|
||||||
|
|
||||||
|
// hmmm
|
||||||
|
o.remove<Message::Components::Transfer::TagPaused>();
|
||||||
|
|
||||||
|
// we dont want the info anymore
|
||||||
|
o.remove<Components::ReRequestInfoTimer>();
|
||||||
|
} else {
|
||||||
|
o = newObject(ByteSpan{sha1_info_hash});
|
||||||
|
|
||||||
|
o.emplace<Components::FT1InfoSHA1>(sha1_info);
|
||||||
|
o.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
|
||||||
|
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
|
||||||
|
{ // lookup tables and have
|
||||||
|
auto& cc = o.emplace<Components::FT1ChunkSHA1Cache>();
|
||||||
|
cc.have_all = true;
|
||||||
|
// skip have vec, since all
|
||||||
|
cc.have_count = sha1_info.chunks.size(); // need?
|
||||||
|
|
||||||
|
cc.chunk_hash_to_index.clear(); // for cpy pst
|
||||||
|
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
|
||||||
|
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // file info
|
||||||
|
auto& file_info = o.emplace<Message::Components::Transfer::FileInfo>();
|
||||||
|
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
|
||||||
|
file_info.total_size = file_impl->_file_size;
|
||||||
|
|
||||||
|
o.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
o.emplace_or_replace<Message::Components::Transfer::File>(std::move(file_impl));
|
||||||
|
|
||||||
|
// TODO: replace with transfers stats
|
||||||
|
if (!o.all_of<Message::Components::Transfer::BytesSent>()) {
|
||||||
|
o.emplace<Message::Components::Transfer::BytesSent>(0u);
|
||||||
|
}
|
||||||
|
|
||||||
|
cb(o);
|
||||||
|
|
||||||
|
// TODO: earlier?
|
||||||
|
_os.throwEventUpdate(o);
|
||||||
|
}));
|
||||||
|
ibs->info_builder_dirty = true; // still in scope, set before mutex unlock
|
||||||
|
})).detach();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<File2I> SHA1MappedFilesystem::file2(Object ov, FILE2_FLAGS flags) {
|
||||||
|
ObjectHandle o{_os.registry(), ov};
|
||||||
|
|
||||||
|
if (!static_cast<bool>(o)) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!o.all_of<Message::Components::Transfer::FileInfoLocal>()) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto& file_list = o.get<Message::Components::Transfer::FileInfoLocal>().file_list;
|
||||||
|
if (file_list.empty()) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto res = construct_file2_rw_mapped(file_list.front(), -1);
|
||||||
|
if (!res || !res->isGood()) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // Backends
|
||||||
|
|
39
solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp
Normal file
39
solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <solanaceae/object_store/object_store.hpp>
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <string_view>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
namespace Backends {
|
||||||
|
|
||||||
|
// fwd to hide the threading headers
|
||||||
|
struct SHA1MappedFilesystem_InfoBuilderState;
|
||||||
|
|
||||||
|
struct SHA1MappedFilesystem : public StorageBackendI {
|
||||||
|
std::unique_ptr<SHA1MappedFilesystem_InfoBuilderState> _ibs;
|
||||||
|
|
||||||
|
SHA1MappedFilesystem(
|
||||||
|
ObjectStore2& os
|
||||||
|
);
|
||||||
|
~SHA1MappedFilesystem(void);
|
||||||
|
|
||||||
|
// pull from info builder queue
|
||||||
|
// call from main thread (os thread?) often
|
||||||
|
void tick(void);
|
||||||
|
|
||||||
|
ObjectHandle newObject(ByteSpan id) override;
|
||||||
|
|
||||||
|
// performs async file hashing
|
||||||
|
// create message in cb
|
||||||
|
void newFromFile(std::string_view file_name, std::string_view file_path, std::function<void(ObjectHandle o)>&& cb/*, bool merge_preexisting = false*/);
|
||||||
|
|
||||||
|
// might return pre-existing?
|
||||||
|
ObjectHandle newFromInfoHash(ByteSpan info_hash);
|
||||||
|
|
||||||
|
std::unique_ptr<File2I> file2(Object o, FILE2_FLAGS flags); // default does nothing
|
||||||
|
};
|
||||||
|
|
||||||
|
} // Backends
|
||||||
|
|
@ -29,7 +29,6 @@
|
|||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <thread>
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
namespace Message::Components {
|
namespace Message::Components {
|
||||||
@ -193,7 +192,8 @@ SHA1_NGCFT1::SHA1_NGCFT1(
|
|||||||
_nft(nft),
|
_nft(nft),
|
||||||
_tcm(tcm),
|
_tcm(tcm),
|
||||||
_tep(tep),
|
_tep(tep),
|
||||||
_neep(neep)
|
_neep(neep),
|
||||||
|
_mfb(os)
|
||||||
{
|
{
|
||||||
// TODO: also create and destroy
|
// TODO: also create and destroy
|
||||||
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
|
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
|
||||||
@ -221,17 +221,7 @@ SHA1_NGCFT1::SHA1_NGCFT1(
|
|||||||
|
|
||||||
float SHA1_NGCFT1::iterate(float delta) {
|
float SHA1_NGCFT1::iterate(float delta) {
|
||||||
//std::cerr << "---------- new tick ----------\n";
|
//std::cerr << "---------- new tick ----------\n";
|
||||||
// info builder queue
|
_mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
|
||||||
if (_info_builder_dirty) {
|
|
||||||
std::lock_guard l{_info_builder_queue_mutex};
|
|
||||||
_info_builder_dirty = false; // set while holding lock
|
|
||||||
|
|
||||||
for (auto& it : _info_builder_queue) {
|
|
||||||
//it.fn();
|
|
||||||
it();
|
|
||||||
}
|
|
||||||
_info_builder_queue.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
entt::dense_map<Contact3, size_t> peer_open_requests;
|
entt::dense_map<Contact3, size_t> peer_open_requests;
|
||||||
|
|
||||||
@ -440,6 +430,89 @@ float SHA1_NGCFT1::iterate(float delta) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// gets called back on main thread after a "new" file info got built on a different thread
|
||||||
|
void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts) {
|
||||||
|
// sanity
|
||||||
|
if (!o.all_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Hash>()) {
|
||||||
|
assert(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// update content lookup
|
||||||
|
const auto& info_hash = o.get<Components::FT1InfoSHA1Hash>().hash;
|
||||||
|
_info_to_content[info_hash] = o;
|
||||||
|
|
||||||
|
// update chunk lookup
|
||||||
|
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
|
||||||
|
const auto& info = o.get<Components::FT1InfoSHA1>();
|
||||||
|
for (size_t i = 0; i < info.chunks.size(); i++) {
|
||||||
|
_chunks[info.chunks[i]] = o;
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove from info request queue
|
||||||
|
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) {
|
||||||
|
_queue_content_want_info.erase(it);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: we dont want chunks anymore
|
||||||
|
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
|
||||||
|
|
||||||
|
// something happend, update all chunk pickers
|
||||||
|
if (o.all_of<Components::SuspectedParticipants>()) {
|
||||||
|
for (const auto& pcv : o.get<Components::SuspectedParticipants>().participants) {
|
||||||
|
Contact3Handle pch{_cr, pcv};
|
||||||
|
assert(static_cast<bool>(pch));
|
||||||
|
pch.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create message
|
||||||
|
const auto c_self = _cr.get<Contact::Components::Self>(c).self;
|
||||||
|
if (!_cr.valid(c_self)) {
|
||||||
|
std::cerr << "SHA1_NGCFT1 error: failed to get self!\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto msg_e = reg_ptr->create();
|
||||||
|
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
||||||
|
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
||||||
|
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
|
||||||
|
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
|
||||||
|
|
||||||
|
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
|
||||||
|
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
|
||||||
|
|
||||||
|
o.get_or_emplace<Components::Messages>().messages.push_back({*reg_ptr, msg_e});
|
||||||
|
|
||||||
|
//reg_ptr->emplace<Message::Components::Transfer::FileKind>(e, file_kind);
|
||||||
|
// file id would be sha1_info hash or something
|
||||||
|
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
|
||||||
|
|
||||||
|
if (_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
|
||||||
|
const uint32_t group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
|
||||||
|
uint32_t message_id = 0;
|
||||||
|
|
||||||
|
// TODO: check return
|
||||||
|
_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size());
|
||||||
|
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||||
|
} else if (
|
||||||
|
// non online group
|
||||||
|
_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
|
||||||
|
) {
|
||||||
|
// create msg_id
|
||||||
|
const uint32_t message_id = randombytes_random();
|
||||||
|
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||||
|
} // TODO: else private message
|
||||||
|
|
||||||
|
reg_ptr->get_or_emplace<Message::Components::SyncedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||||
|
reg_ptr->get_or_emplace<Message::Components::ReceivedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||||
|
|
||||||
|
_rmm.throwEventConstruct(*reg_ptr, msg_e);
|
||||||
|
|
||||||
|
// TODO: place in iterate?
|
||||||
|
updateMessages(o);
|
||||||
|
}
|
||||||
|
|
||||||
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
||||||
// see tox_transfer_manager.cpp for reference
|
// see tox_transfer_manager.cpp for reference
|
||||||
if (!e.e.all_of<Message::Components::Transfer::ActionAccept, Message::Components::Content>()) {
|
if (!e.e.all_of<Message::Components::Transfer::ActionAccept, Message::Components::Content>()) {
|
||||||
@ -990,6 +1063,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (!cc.have_all) { // debug print self have set
|
if (!cc.have_all) { // debug print self have set
|
||||||
std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits();
|
std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits();
|
||||||
for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) {
|
for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) {
|
||||||
@ -1001,6 +1075,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
|||||||
}
|
}
|
||||||
printf("\n");
|
printf("\n");
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
} else {
|
} else {
|
||||||
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
|
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
|
||||||
}
|
}
|
||||||
@ -1176,252 +1251,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
|||||||
// get current time unix epoch utc
|
// get current time unix epoch utc
|
||||||
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
std::thread(std::move([
|
_mfb.newFromFile(
|
||||||
// copy everything
|
file_name, file_path,
|
||||||
self = this,
|
[this, reg_ptr, c, ts](ObjectHandle o) { onSendFileHashFinished(o, reg_ptr, c, ts); }
|
||||||
ts,
|
);
|
||||||
c,
|
|
||||||
reg_ptr,
|
|
||||||
file_name_ = std::string(file_name),
|
|
||||||
file_path_ = std::string(file_path)
|
|
||||||
]() mutable {
|
|
||||||
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, -1);
|
|
||||||
if (!file_impl->isGood()) {
|
|
||||||
{
|
|
||||||
std::lock_guard l{self->_info_builder_queue_mutex};
|
|
||||||
self->_info_builder_queue.push_back([file_path_](){
|
|
||||||
// back on iterate thread
|
|
||||||
|
|
||||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
|
|
||||||
});
|
|
||||||
self->_info_builder_dirty = true; // still in scope, set before mutex unlock
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. build info by hashing all chunks
|
|
||||||
|
|
||||||
FT1InfoSHA1 sha1_info;
|
|
||||||
// build info
|
|
||||||
sha1_info.file_name = file_name_;
|
|
||||||
sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details
|
|
||||||
|
|
||||||
{ // build chunks
|
|
||||||
// HACK: load file fully
|
|
||||||
// ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway
|
|
||||||
const auto file_data = file_impl->read(file_impl->_file_size, 0);
|
|
||||||
size_t i = 0;
|
|
||||||
for (; i + sha1_info.chunk_size < file_data.size; i += sha1_info.chunk_size) {
|
|
||||||
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, sha1_info.chunk_size));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (i < file_data.size) {
|
|
||||||
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, file_data.size-i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
file_impl.reset();
|
|
||||||
|
|
||||||
{
|
|
||||||
std::lock_guard l{self->_info_builder_queue_mutex};
|
|
||||||
self->_info_builder_queue.push_back(std::move([
|
|
||||||
self,
|
|
||||||
ts,
|
|
||||||
c,
|
|
||||||
reg_ptr,
|
|
||||||
file_name_,
|
|
||||||
file_path_,
|
|
||||||
sha1_info = std::move(sha1_info)
|
|
||||||
]() mutable { //
|
|
||||||
// back on iterate thread
|
|
||||||
|
|
||||||
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, sha1_info.file_size);
|
|
||||||
if (!file_impl->isGood()) {
|
|
||||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. hash info
|
|
||||||
std::vector<uint8_t> sha1_info_data;
|
|
||||||
std::vector<uint8_t> sha1_info_hash;
|
|
||||||
|
|
||||||
std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info;
|
|
||||||
sha1_info_data = sha1_info.toBuffer();
|
|
||||||
std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n";
|
|
||||||
sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size());
|
|
||||||
std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n";
|
|
||||||
|
|
||||||
// check if content exists
|
|
||||||
ObjectHandle ce;
|
|
||||||
if (self->_info_to_content.count(sha1_info_hash)) {
|
|
||||||
ce = self->_info_to_content.at(sha1_info_hash);
|
|
||||||
|
|
||||||
// TODO: check if content is incomplete and use file instead
|
|
||||||
if (!ce.all_of<Components::FT1InfoSHA1>()) {
|
|
||||||
ce.emplace<Components::FT1InfoSHA1>(sha1_info);
|
|
||||||
}
|
|
||||||
if (!ce.all_of<Components::FT1InfoSHA1Data>()) {
|
|
||||||
ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
// hash has to be set already
|
|
||||||
// Components::FT1InfoSHA1Hash
|
|
||||||
|
|
||||||
{ // lookup tables and have
|
|
||||||
auto& cc = ce.get_or_emplace<Components::FT1ChunkSHA1Cache>();
|
|
||||||
cc.have_all = true;
|
|
||||||
// skip have vec, since all
|
|
||||||
//cc.have_chunk
|
|
||||||
cc.have_count = sha1_info.chunks.size(); // need?
|
|
||||||
|
|
||||||
self->_info_to_content[sha1_info_hash] = ce;
|
|
||||||
cc.chunk_hash_to_index.clear(); // for cpy pst
|
|
||||||
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
|
|
||||||
self->_chunks[sha1_info.chunks[i]] = ce;
|
|
||||||
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{ // file info
|
|
||||||
// TODO: not overwrite fi? since same?
|
|
||||||
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
|
|
||||||
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
|
|
||||||
file_info.total_size = file_impl->_file_size;
|
|
||||||
|
|
||||||
ce.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
|
|
||||||
}
|
|
||||||
|
|
||||||
// cleanup file
|
|
||||||
if (ce.all_of<Message::Components::Transfer::File>()) {
|
|
||||||
// replace
|
|
||||||
ce.remove<Message::Components::Transfer::File>();
|
|
||||||
}
|
|
||||||
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
|
|
||||||
|
|
||||||
if (!ce.all_of<Message::Components::Transfer::BytesSent>()) {
|
|
||||||
ce.emplace<Message::Components::Transfer::BytesSent>(0u);
|
|
||||||
}
|
|
||||||
|
|
||||||
ce.remove<Message::Components::Transfer::TagPaused>();
|
|
||||||
|
|
||||||
// we dont want the info anymore
|
|
||||||
ce.remove<Components::ReRequestInfoTimer>();
|
|
||||||
if (auto it = std::find(self->_queue_content_want_info.begin(), self->_queue_content_want_info.end(), ce); it != self->_queue_content_want_info.end()) {
|
|
||||||
self->_queue_content_want_info.erase(it);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: we dont want chunks anymore
|
|
||||||
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
|
|
||||||
} else {
|
|
||||||
// TODO: backend
|
|
||||||
ce = {self->_os.registry(), self->_os.registry().create()};
|
|
||||||
self->_info_to_content[sha1_info_hash] = ce;
|
|
||||||
|
|
||||||
ce.emplace<Components::FT1InfoSHA1>(sha1_info);
|
|
||||||
ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
|
|
||||||
ce.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
|
|
||||||
{ // lookup tables and have
|
|
||||||
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
|
|
||||||
cc.have_all = true;
|
|
||||||
// skip have vec, since all
|
|
||||||
//cc.have_chunk
|
|
||||||
cc.have_count = sha1_info.chunks.size(); // need?
|
|
||||||
|
|
||||||
self->_info_to_content[sha1_info_hash] = ce;
|
|
||||||
cc.chunk_hash_to_index.clear(); // for cpy pst
|
|
||||||
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
|
|
||||||
self->_chunks[sha1_info.chunks[i]] = ce;
|
|
||||||
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{ // file info
|
|
||||||
auto& file_info = ce.emplace<Message::Components::Transfer::FileInfo>();
|
|
||||||
//const auto& file = ce.get<Message::Components::Transfer::File>();
|
|
||||||
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
|
|
||||||
file_info.total_size = file_impl->_file_size;
|
|
||||||
|
|
||||||
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
|
|
||||||
}
|
|
||||||
|
|
||||||
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
|
|
||||||
|
|
||||||
ce.emplace<Message::Components::Transfer::BytesSent>(0u);
|
|
||||||
}
|
|
||||||
|
|
||||||
// something happend, update all chunk pickers
|
|
||||||
if (ce.all_of<Components::SuspectedParticipants>()) {
|
|
||||||
for (const auto& pcv : ce.get<Components::SuspectedParticipants>().participants) {
|
|
||||||
Contact3Handle pch{self->_cr, pcv};
|
|
||||||
assert(static_cast<bool>(pch));
|
|
||||||
pch.emplace_or_replace<ChunkPickerUpdateTag>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto c_self = self->_cr.get<Contact::Components::Self>(c).self;
|
|
||||||
if (!self->_cr.valid(c_self)) {
|
|
||||||
std::cerr << "SHA1_NGCFT1 error: failed to get self!\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto msg_e = reg_ptr->create();
|
|
||||||
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
|
||||||
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
|
||||||
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
|
|
||||||
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
|
|
||||||
|
|
||||||
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
|
|
||||||
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
|
|
||||||
|
|
||||||
ce.get_or_emplace<Components::Messages>().messages.push_back({*reg_ptr, msg_e});
|
|
||||||
|
|
||||||
|
|
||||||
//reg_ptr->emplace<Message::Components::Transfer::FileKind>(e, file_kind);
|
|
||||||
// file id would be sha1_info hash or something
|
|
||||||
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
|
|
||||||
|
|
||||||
// remove? done in updateMessages() anyway
|
|
||||||
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
|
|
||||||
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(msg_e, ce.get<Message::Components::Transfer::FileInfo>());
|
|
||||||
}
|
|
||||||
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) {
|
|
||||||
reg_ptr->emplace<Message::Components::Transfer::FileInfoLocal>(msg_e, ce.get<Message::Components::Transfer::FileInfoLocal>());
|
|
||||||
}
|
|
||||||
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
|
|
||||||
reg_ptr->emplace<Message::Components::Transfer::BytesSent>(msg_e, ce.get<Message::Components::Transfer::BytesSent>());
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: determine if this is true
|
|
||||||
//reg_ptr->emplace<Message::Components::Transfer::TagPaused>(e);
|
|
||||||
|
|
||||||
if (self->_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
|
|
||||||
const uint32_t group_number = self->_cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
|
|
||||||
uint32_t message_id = 0;
|
|
||||||
|
|
||||||
// TODO: check return
|
|
||||||
self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size());
|
|
||||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
|
||||||
} else if (
|
|
||||||
// non online group
|
|
||||||
self->_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
|
|
||||||
) {
|
|
||||||
// create msg_id
|
|
||||||
const uint32_t message_id = randombytes_random();
|
|
||||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
reg_ptr->get_or_emplace<Message::Components::SyncedBy>(msg_e).ts.try_emplace(c_self, ts);
|
|
||||||
reg_ptr->get_or_emplace<Message::Components::ReceivedBy>(msg_e).ts.try_emplace(c_self, ts);
|
|
||||||
|
|
||||||
self->_rmm.throwEventConstruct(*reg_ptr, msg_e);
|
|
||||||
|
|
||||||
// TODO: place in iterate?
|
|
||||||
self->updateMessages(ce);
|
|
||||||
|
|
||||||
}));
|
|
||||||
self->_info_builder_dirty = true; // still in scope, set before mutex unlock
|
|
||||||
}
|
|
||||||
})).detach();
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -13,12 +13,12 @@
|
|||||||
#include "./sending_transfers.hpp"
|
#include "./sending_transfers.hpp"
|
||||||
#include "./receiving_transfers.hpp"
|
#include "./receiving_transfers.hpp"
|
||||||
|
|
||||||
|
#include "./backends/sha1_mapped_filesystem.hpp"
|
||||||
|
|
||||||
#include <entt/container/dense_map.hpp>
|
#include <entt/container/dense_map.hpp>
|
||||||
|
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <atomic>
|
#include <chrono>
|
||||||
#include <mutex>
|
|
||||||
#include <list>
|
|
||||||
|
|
||||||
class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public NGCFT1EventI, public NGCEXTEventI {
|
class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public NGCFT1EventI, public NGCEXTEventI {
|
||||||
ObjectStore2& _os;
|
ObjectStore2& _os;
|
||||||
@ -30,6 +30,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
|||||||
ToxEventProviderI& _tep;
|
ToxEventProviderI& _tep;
|
||||||
NGCEXTEventProvider& _neep;
|
NGCEXTEventProvider& _neep;
|
||||||
|
|
||||||
|
Backends::SHA1MappedFilesystem _mfb;
|
||||||
|
|
||||||
std::minstd_rand _rng {1337*11};
|
std::minstd_rand _rng {1337*11};
|
||||||
|
|
||||||
using clock = std::chrono::steady_clock;
|
using clock = std::chrono::steady_clock;
|
||||||
@ -67,11 +69,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
|||||||
// only used to remove participation on peer exit
|
// only used to remove participation on peer exit
|
||||||
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
||||||
|
|
||||||
std::atomic_bool _info_builder_dirty {false};
|
|
||||||
std::mutex _info_builder_queue_mutex;
|
|
||||||
using InfoBuilderEntry = std::function<void(void)>;
|
|
||||||
std::list<InfoBuilderEntry> _info_builder_queue;
|
|
||||||
|
|
||||||
void updateMessages(ObjectHandle ce);
|
void updateMessages(ObjectHandle ce);
|
||||||
|
|
||||||
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
|
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
|
||||||
@ -97,6 +94,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
|||||||
|
|
||||||
float iterate(float delta);
|
float iterate(float delta);
|
||||||
|
|
||||||
|
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts);
|
||||||
|
|
||||||
protected: // rmm events (actions)
|
protected: // rmm events (actions)
|
||||||
bool onEvent(const Message::Events::MessageUpdated&) override;
|
bool onEvent(const Message::Events::MessageUpdated&) override;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user