and use new backend code (partially transitioned to os backend)
This commit is contained in:
parent
e50e74e12f
commit
54ace9d0b2
@ -29,7 +29,6 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <filesystem>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
namespace Message::Components {
|
||||
@ -193,7 +192,8 @@ SHA1_NGCFT1::SHA1_NGCFT1(
|
||||
_nft(nft),
|
||||
_tcm(tcm),
|
||||
_tep(tep),
|
||||
_neep(neep)
|
||||
_neep(neep),
|
||||
_mfb(os)
|
||||
{
|
||||
// TODO: also create and destroy
|
||||
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated);
|
||||
@ -221,17 +221,7 @@ SHA1_NGCFT1::SHA1_NGCFT1(
|
||||
|
||||
float SHA1_NGCFT1::iterate(float delta) {
|
||||
//std::cerr << "---------- new tick ----------\n";
|
||||
// info builder queue
|
||||
if (_info_builder_dirty) {
|
||||
std::lock_guard l{_info_builder_queue_mutex};
|
||||
_info_builder_dirty = false; // set while holding lock
|
||||
|
||||
for (auto& it : _info_builder_queue) {
|
||||
//it.fn();
|
||||
it();
|
||||
}
|
||||
_info_builder_queue.clear();
|
||||
}
|
||||
_mfb.tick(); // does not need to be called as often, once every sec would be enough, but the pointer deref + atomic bool should be very fast
|
||||
|
||||
entt::dense_map<Contact3, size_t> peer_open_requests;
|
||||
|
||||
@ -440,6 +430,89 @@ float SHA1_NGCFT1::iterate(float delta) {
|
||||
}
|
||||
}
|
||||
|
||||
// gets called back on main thread after a "new" file info got built on a different thread
|
||||
void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts) {
|
||||
// sanity
|
||||
if (!o.all_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Hash>()) {
|
||||
assert(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// update content lookup
|
||||
const auto& info_hash = o.get<Components::FT1InfoSHA1Hash>().hash;
|
||||
_info_to_content[info_hash] = o;
|
||||
|
||||
// update chunk lookup
|
||||
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
|
||||
const auto& info = o.get<Components::FT1InfoSHA1>();
|
||||
for (size_t i = 0; i < info.chunks.size(); i++) {
|
||||
_chunks[info.chunks[i]] = o;
|
||||
}
|
||||
|
||||
// remove from info request queue
|
||||
if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), o); it != _queue_content_want_info.end()) {
|
||||
_queue_content_want_info.erase(it);
|
||||
}
|
||||
|
||||
// TODO: we dont want chunks anymore
|
||||
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
|
||||
|
||||
// something happend, update all chunk pickers
|
||||
if (o.all_of<Components::SuspectedParticipants>()) {
|
||||
for (const auto& pcv : o.get<Components::SuspectedParticipants>().participants) {
|
||||
Contact3Handle pch{_cr, pcv};
|
||||
assert(static_cast<bool>(pch));
|
||||
pch.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
}
|
||||
|
||||
// create message
|
||||
const auto c_self = _cr.get<Contact::Components::Self>(c).self;
|
||||
if (!_cr.valid(c_self)) {
|
||||
std::cerr << "SHA1_NGCFT1 error: failed to get self!\n";
|
||||
return;
|
||||
}
|
||||
|
||||
const auto msg_e = reg_ptr->create();
|
||||
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
||||
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
||||
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
|
||||
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
|
||||
|
||||
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
|
||||
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
|
||||
|
||||
o.get_or_emplace<Components::Messages>().messages.push_back({*reg_ptr, msg_e});
|
||||
|
||||
//reg_ptr->emplace<Message::Components::Transfer::FileKind>(e, file_kind);
|
||||
// file id would be sha1_info hash or something
|
||||
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
|
||||
|
||||
if (_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
|
||||
const uint32_t group_number = _cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
|
||||
uint32_t message_id = 0;
|
||||
|
||||
// TODO: check return
|
||||
_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size());
|
||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||
} else if (
|
||||
// non online group
|
||||
_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
|
||||
) {
|
||||
// create msg_id
|
||||
const uint32_t message_id = randombytes_random();
|
||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||
} // TODO: else private message
|
||||
|
||||
reg_ptr->get_or_emplace<Message::Components::SyncedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||
reg_ptr->get_or_emplace<Message::Components::ReceivedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||
|
||||
_rmm.throwEventConstruct(*reg_ptr, msg_e);
|
||||
|
||||
// TODO: place in iterate?
|
||||
updateMessages(o);
|
||||
}
|
||||
|
||||
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
|
||||
// see tox_transfer_manager.cpp for reference
|
||||
if (!e.e.all_of<Message::Components::Transfer::ActionAccept, Message::Components::Content>()) {
|
||||
@ -990,6 +1063,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
);
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (!cc.have_all) { // debug print self have set
|
||||
std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits();
|
||||
for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) {
|
||||
@ -1001,6 +1075,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
|
||||
}
|
||||
@ -1176,252 +1251,10 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
|
||||
// get current time unix epoch utc
|
||||
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
std::thread(std::move([
|
||||
// copy everything
|
||||
self = this,
|
||||
ts,
|
||||
c,
|
||||
reg_ptr,
|
||||
file_name_ = std::string(file_name),
|
||||
file_path_ = std::string(file_path)
|
||||
]() mutable {
|
||||
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, -1);
|
||||
if (!file_impl->isGood()) {
|
||||
{
|
||||
std::lock_guard l{self->_info_builder_queue_mutex};
|
||||
self->_info_builder_queue.push_back([file_path_](){
|
||||
// back on iterate thread
|
||||
|
||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
|
||||
});
|
||||
self->_info_builder_dirty = true; // still in scope, set before mutex unlock
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. build info by hashing all chunks
|
||||
|
||||
FT1InfoSHA1 sha1_info;
|
||||
// build info
|
||||
sha1_info.file_name = file_name_;
|
||||
sha1_info.file_size = file_impl->_file_size; // TODO: remove the reliance on implementation details
|
||||
|
||||
{ // build chunks
|
||||
// HACK: load file fully
|
||||
// ... its only a hack if its not memory mapped, but reading in chunk_sized chunks is probably a good idea anyway
|
||||
const auto file_data = file_impl->read(file_impl->_file_size, 0);
|
||||
size_t i = 0;
|
||||
for (; i + sha1_info.chunk_size < file_data.size; i += sha1_info.chunk_size) {
|
||||
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, sha1_info.chunk_size));
|
||||
}
|
||||
|
||||
if (i < file_data.size) {
|
||||
sha1_info.chunks.push_back(hash_sha1(file_data.ptr+i, file_data.size-i));
|
||||
}
|
||||
}
|
||||
|
||||
file_impl.reset();
|
||||
|
||||
{
|
||||
std::lock_guard l{self->_info_builder_queue_mutex};
|
||||
self->_info_builder_queue.push_back(std::move([
|
||||
self,
|
||||
ts,
|
||||
c,
|
||||
reg_ptr,
|
||||
file_name_,
|
||||
file_path_,
|
||||
sha1_info = std::move(sha1_info)
|
||||
]() mutable { //
|
||||
// back on iterate thread
|
||||
|
||||
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(file_path_, sha1_info.file_size);
|
||||
if (!file_impl->isGood()) {
|
||||
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path_ << "'!\n";
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. hash info
|
||||
std::vector<uint8_t> sha1_info_data;
|
||||
std::vector<uint8_t> sha1_info_hash;
|
||||
|
||||
std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info;
|
||||
sha1_info_data = sha1_info.toBuffer();
|
||||
std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n";
|
||||
sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size());
|
||||
std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n";
|
||||
|
||||
// check if content exists
|
||||
ObjectHandle ce;
|
||||
if (self->_info_to_content.count(sha1_info_hash)) {
|
||||
ce = self->_info_to_content.at(sha1_info_hash);
|
||||
|
||||
// TODO: check if content is incomplete and use file instead
|
||||
if (!ce.all_of<Components::FT1InfoSHA1>()) {
|
||||
ce.emplace<Components::FT1InfoSHA1>(sha1_info);
|
||||
}
|
||||
if (!ce.all_of<Components::FT1InfoSHA1Data>()) {
|
||||
ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data);
|
||||
}
|
||||
|
||||
// hash has to be set already
|
||||
// Components::FT1InfoSHA1Hash
|
||||
|
||||
{ // lookup tables and have
|
||||
auto& cc = ce.get_or_emplace<Components::FT1ChunkSHA1Cache>();
|
||||
cc.have_all = true;
|
||||
// skip have vec, since all
|
||||
//cc.have_chunk
|
||||
cc.have_count = sha1_info.chunks.size(); // need?
|
||||
|
||||
self->_info_to_content[sha1_info_hash] = ce;
|
||||
cc.chunk_hash_to_index.clear(); // for cpy pst
|
||||
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
|
||||
self->_chunks[sha1_info.chunks[i]] = ce;
|
||||
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
{ // file info
|
||||
// TODO: not overwrite fi? since same?
|
||||
auto& file_info = ce.emplace_or_replace<Message::Components::Transfer::FileInfo>();
|
||||
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
|
||||
file_info.total_size = file_impl->_file_size;
|
||||
|
||||
ce.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
|
||||
}
|
||||
|
||||
// cleanup file
|
||||
if (ce.all_of<Message::Components::Transfer::File>()) {
|
||||
// replace
|
||||
ce.remove<Message::Components::Transfer::File>();
|
||||
}
|
||||
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
|
||||
|
||||
if (!ce.all_of<Message::Components::Transfer::BytesSent>()) {
|
||||
ce.emplace<Message::Components::Transfer::BytesSent>(0u);
|
||||
}
|
||||
|
||||
ce.remove<Message::Components::Transfer::TagPaused>();
|
||||
|
||||
// we dont want the info anymore
|
||||
ce.remove<Components::ReRequestInfoTimer>();
|
||||
if (auto it = std::find(self->_queue_content_want_info.begin(), self->_queue_content_want_info.end(), ce); it != self->_queue_content_want_info.end()) {
|
||||
self->_queue_content_want_info.erase(it);
|
||||
}
|
||||
|
||||
// TODO: we dont want chunks anymore
|
||||
// TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle)
|
||||
} else {
|
||||
// TODO: backend
|
||||
ce = {self->_os.registry(), self->_os.registry().create()};
|
||||
self->_info_to_content[sha1_info_hash] = ce;
|
||||
|
||||
ce.emplace<Components::FT1InfoSHA1>(sha1_info);
|
||||
ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
|
||||
ce.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
|
||||
{ // lookup tables and have
|
||||
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>();
|
||||
cc.have_all = true;
|
||||
// skip have vec, since all
|
||||
//cc.have_chunk
|
||||
cc.have_count = sha1_info.chunks.size(); // need?
|
||||
|
||||
self->_info_to_content[sha1_info_hash] = ce;
|
||||
cc.chunk_hash_to_index.clear(); // for cpy pst
|
||||
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
|
||||
self->_chunks[sha1_info.chunks[i]] = ce;
|
||||
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
{ // file info
|
||||
auto& file_info = ce.emplace<Message::Components::Transfer::FileInfo>();
|
||||
//const auto& file = ce.get<Message::Components::Transfer::File>();
|
||||
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
|
||||
file_info.total_size = file_impl->_file_size;
|
||||
|
||||
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
|
||||
}
|
||||
|
||||
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl));
|
||||
|
||||
ce.emplace<Message::Components::Transfer::BytesSent>(0u);
|
||||
}
|
||||
|
||||
// something happend, update all chunk pickers
|
||||
if (ce.all_of<Components::SuspectedParticipants>()) {
|
||||
for (const auto& pcv : ce.get<Components::SuspectedParticipants>().participants) {
|
||||
Contact3Handle pch{self->_cr, pcv};
|
||||
assert(static_cast<bool>(pch));
|
||||
pch.emplace_or_replace<ChunkPickerUpdateTag>();
|
||||
}
|
||||
}
|
||||
|
||||
const auto c_self = self->_cr.get<Contact::Components::Self>(c).self;
|
||||
if (!self->_cr.valid(c_self)) {
|
||||
std::cerr << "SHA1_NGCFT1 error: failed to get self!\n";
|
||||
return;
|
||||
}
|
||||
|
||||
const auto msg_e = reg_ptr->create();
|
||||
reg_ptr->emplace<Message::Components::ContactTo>(msg_e, c);
|
||||
reg_ptr->emplace<Message::Components::ContactFrom>(msg_e, c_self);
|
||||
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
|
||||
reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
|
||||
|
||||
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e);
|
||||
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
|
||||
|
||||
ce.get_or_emplace<Components::Messages>().messages.push_back({*reg_ptr, msg_e});
|
||||
|
||||
|
||||
//reg_ptr->emplace<Message::Components::Transfer::FileKind>(e, file_kind);
|
||||
// file id would be sha1_info hash or something
|
||||
//reg_ptr->emplace<Message::Components::Transfer::FileID>(e, file_id);
|
||||
|
||||
// remove? done in updateMessages() anyway
|
||||
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
|
||||
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(msg_e, ce.get<Message::Components::Transfer::FileInfo>());
|
||||
}
|
||||
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) {
|
||||
reg_ptr->emplace<Message::Components::Transfer::FileInfoLocal>(msg_e, ce.get<Message::Components::Transfer::FileInfoLocal>());
|
||||
}
|
||||
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
|
||||
reg_ptr->emplace<Message::Components::Transfer::BytesSent>(msg_e, ce.get<Message::Components::Transfer::BytesSent>());
|
||||
}
|
||||
|
||||
// TODO: determine if this is true
|
||||
//reg_ptr->emplace<Message::Components::Transfer::TagPaused>(e);
|
||||
|
||||
if (self->_cr.any_of<Contact::Components::ToxGroupEphemeral>(c)) {
|
||||
const uint32_t group_number = self->_cr.get<Contact::Components::ToxGroupEphemeral>(c).group_number;
|
||||
uint32_t message_id = 0;
|
||||
|
||||
// TODO: check return
|
||||
self->_nft.NGC_FT1_send_message_public(group_number, message_id, static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size());
|
||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||
} else if (
|
||||
// non online group
|
||||
self->_cr.any_of<Contact::Components::ToxGroupPersistent>(c)
|
||||
) {
|
||||
// create msg_id
|
||||
const uint32_t message_id = randombytes_random();
|
||||
reg_ptr->emplace<Message::Components::ToxGroupMessageID>(msg_e, message_id);
|
||||
}
|
||||
|
||||
reg_ptr->get_or_emplace<Message::Components::SyncedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||
reg_ptr->get_or_emplace<Message::Components::ReceivedBy>(msg_e).ts.try_emplace(c_self, ts);
|
||||
|
||||
self->_rmm.throwEventConstruct(*reg_ptr, msg_e);
|
||||
|
||||
// TODO: place in iterate?
|
||||
self->updateMessages(ce);
|
||||
|
||||
}));
|
||||
self->_info_builder_dirty = true; // still in scope, set before mutex unlock
|
||||
}
|
||||
})).detach();
|
||||
_mfb.newFromFile(
|
||||
file_name, file_path,
|
||||
[this, reg_ptr, c, ts](ObjectHandle o) { onSendFileHashFinished(o, reg_ptr, c, ts); }
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -13,12 +13,12 @@
|
||||
#include "./sending_transfers.hpp"
|
||||
#include "./receiving_transfers.hpp"
|
||||
|
||||
#include "./backends/sha1_mapped_filesystem.hpp"
|
||||
|
||||
#include <entt/container/dense_map.hpp>
|
||||
|
||||
#include <random>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <list>
|
||||
#include <chrono>
|
||||
|
||||
class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public NGCFT1EventI, public NGCEXTEventI {
|
||||
ObjectStore2& _os;
|
||||
@ -30,6 +30,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
ToxEventProviderI& _tep;
|
||||
NGCEXTEventProvider& _neep;
|
||||
|
||||
Backends::SHA1MappedFilesystem _mfb;
|
||||
|
||||
std::minstd_rand _rng {1337*11};
|
||||
|
||||
using clock = std::chrono::steady_clock;
|
||||
@ -67,11 +69,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
// only used to remove participation on peer exit
|
||||
entt::dense_map<uint64_t, Contact3Handle> _tox_peer_to_contact;
|
||||
|
||||
std::atomic_bool _info_builder_dirty {false};
|
||||
std::mutex _info_builder_queue_mutex;
|
||||
using InfoBuilderEntry = std::function<void(void)>;
|
||||
std::list<InfoBuilderEntry> _info_builder_queue;
|
||||
|
||||
void updateMessages(ObjectHandle ce);
|
||||
|
||||
std::optional<std::pair<uint32_t, uint32_t>> selectPeerForRequest(ObjectHandle ce);
|
||||
@ -97,6 +94,8 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
|
||||
|
||||
float iterate(float delta);
|
||||
|
||||
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts);
|
||||
|
||||
protected: // rmm events (actions)
|
||||
bool onEvent(const Message::Events::MessageUpdated&) override;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user