adopt to new os and message file refactor

This commit is contained in:
Green Sky 2024-07-24 17:55:31 +02:00
parent 16cb755191
commit da406714ff
No known key found for this signature in database
9 changed files with 284 additions and 308 deletions

View File

@ -1,6 +1,7 @@
#include "./sha1_mapped_filesystem.hpp" #include "./sha1_mapped_filesystem.hpp"
#include <solanaceae/object_store/meta_components.hpp> #include <solanaceae/object_store/meta_components.hpp>
#include <solanaceae/object_store/meta_components_file.hpp>
#include "../file_constructor.hpp" #include "../file_constructor.hpp"
#include "../ft1_sha1_info.hpp" #include "../ft1_sha1_info.hpp"
@ -138,10 +139,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
o = {_os.registry(), it_ov}; o = {_os.registry(), it_ov};
} }
} }
//if (self->info_to_content.count(sha1_info_hash)) {
if (static_cast<bool>(o)) { if (static_cast<bool>(o)) {
//ce = self->info_to_content.at(sha1_info_hash);
// TODO: check if content is incomplete and use file instead // TODO: check if content is incomplete and use file instead
if (!o.all_of<Components::FT1InfoSHA1>()) { if (!o.all_of<Components::FT1InfoSHA1>()) {
o.emplace<Components::FT1InfoSHA1>(sha1_info); o.emplace<Components::FT1InfoSHA1>(sha1_info);
@ -153,32 +151,9 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
// hash has to be set already // hash has to be set already
// Components::FT1InfoSHA1Hash // Components::FT1InfoSHA1Hash
{ // lookup tables and have
auto& cc = o.get_or_emplace<Components::FT1ChunkSHA1Cache>();
cc.have_all = true;
// skip have vec, since all
//cc.have_chunk
cc.have_count = sha1_info.chunks.size(); // need?
//self->_info_to_content[sha1_info_hash] = ce;
cc.chunk_hash_to_index.clear(); // for cpy pst
for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
//self->_chunks[sha1_info.chunks[i]] = ce;
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
}
}
{ // file info
// TODO: not overwrite fi? since same?
auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size};
file_info.total_size = file_impl->_file_size;
o.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
}
// hmmm // hmmm
o.remove<Message::Components::Transfer::TagPaused>(); // TODO: we need a replacement for this
o.remove<ObjComp::Ephemeral::File::TagTransferPaused>();
// we dont want the info anymore // we dont want the info anymore
o.remove<Components::ReRequestInfoTimer>(); o.remove<Components::ReRequestInfoTimer>();
@ -188,32 +163,33 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
o.emplace<Components::FT1InfoSHA1>(sha1_info); o.emplace<Components::FT1InfoSHA1>(sha1_info);
o.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file? o.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash); o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
{ // lookup tables and have }
auto& cc = o.emplace<Components::FT1ChunkSHA1Cache>();
cc.have_all = true;
// skip have vec, since all
cc.have_count = sha1_info.chunks.size(); // need?
cc.chunk_hash_to_index.clear(); // for cpy pst { // lookup tables and have
for (size_t i = 0; i < sha1_info.chunks.size(); i++) { auto& cc = o.get_or_emplace<Components::FT1ChunkSHA1Cache>();
cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); // skip have vec, since all
} cc.have_count = sha1_info.chunks.size(); // need?
}
{ // file info cc.chunk_hash_to_index.clear(); // for cpy pst
auto& file_info = o.emplace<Message::Components::Transfer::FileInfo>(); for (size_t i = 0; i < sha1_info.chunks.size(); i++) {
file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i);
file_info.total_size = file_impl->_file_size;
o.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{std::string{file_path_}});
} }
} }
o.emplace_or_replace<Message::Components::Transfer::File>(std::move(file_impl)); o.emplace_or_replace<ObjComp::F::TagLocalHaveAll>();
o.remove<ObjComp::F::LocalHaveBitset>();
// TODO: replace with transfers stats { // file info
if (!o.all_of<Message::Components::Transfer::BytesSent>()) { // TODO: not overwrite fi? since same?
o.emplace<Message::Components::Transfer::BytesSent>(0u); o.emplace_or_replace<ObjComp::F::SingleInfo>(file_name_, file_impl->_file_size);
o.emplace_or_replace<ObjComp::F::SingleInfoLocal>(file_path_);
o.emplace_or_replace<ObjComp::Ephemeral::FilePath>(file_path_); // ?
}
o.emplace_or_replace<Components::FT1File2>(std::move(file_impl));
if (!o.all_of<ObjComp::Ephemeral::File::TransferStats>()) {
o.emplace<ObjComp::Ephemeral::File::TransferStats>();
} }
cb(o); cb(o);
@ -226,23 +202,33 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v
} }
std::unique_ptr<File2I> SHA1MappedFilesystem::file2(Object ov, FILE2_FLAGS flags) { std::unique_ptr<File2I> SHA1MappedFilesystem::file2(Object ov, FILE2_FLAGS flags) {
if (flags & FILE2_RAW) {
std::cerr << "SHA1MF error: does not support raw modes\n";
return nullptr;
}
ObjectHandle o{_os.registry(), ov}; ObjectHandle o{_os.registry(), ov};
if (!static_cast<bool>(o)) { if (!static_cast<bool>(o)) {
return nullptr; return nullptr;
} }
if (!o.all_of<Message::Components::Transfer::FileInfoLocal>()) { // will this do if we go and support enc?
// use ObjComp::Ephemeral::FilePath instead??
if (!o.all_of<ObjComp::F::SingleInfoLocal>()) {
return nullptr; return nullptr;
} }
const auto& file_list = o.get<Message::Components::Transfer::FileInfoLocal>().file_list; const auto& file_path = o.get<ObjComp::F::SingleInfoLocal>().file_path;
if (file_list.empty()) { if (file_path.empty()) {
return nullptr; return nullptr;
} }
auto res = construct_file2_rw_mapped(file_list.front(), -1); // TODO: read-only one too
// since they are mapped, is this efficent to have multiple?
auto res = construct_file2_rw_mapped(file_path, -1);
if (!res || !res->isGood()) { if (!res || !res->isGood()) {
std::cerr << "SHA1MF error: failed constructing mapped file '" << file_path << "'\n";
return nullptr; return nullptr;
} }

View File

@ -32,7 +32,7 @@ struct SHA1MappedFilesystem : public StorageBackendI {
// might return pre-existing? // might return pre-existing?
ObjectHandle newFromInfoHash(ByteSpan info_hash); ObjectHandle newFromInfoHash(ByteSpan info_hash);
std::unique_ptr<File2I> file2(Object o, FILE2_FLAGS flags); // default does nothing std::unique_ptr<File2I> file2(Object o, FILE2_FLAGS flags) override;
}; };
} // Backends } // Backends

View File

@ -1,10 +1,11 @@
#include "./chunk_picker.hpp" #include "./chunk_picker.hpp"
#include <solanaceae/tox_contacts/components.hpp> #include <solanaceae/tox_contacts/components.hpp>
#include "./components.hpp"
#include "./contact_components.hpp" #include "./contact_components.hpp"
#include <solanaceae/object_store/meta_components_file.hpp>
#include "./components.hpp"
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
@ -140,37 +141,39 @@ void ChunkPicker::updateParticipation(
continue; continue;
} }
if (o.all_of<Message::Components::Transfer::TagPaused>()) { if (o.all_of<ObjComp::Ephemeral::File::TagTransferPaused>()) {
participating_unfinished.erase(o); participating_unfinished.erase(o);
continue; continue;
} }
if (o.get<Components::FT1ChunkSHA1Cache>().have_all) { if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
participating_unfinished.erase(o); participating_unfinished.erase(o);
continue;
} }
} else { } else {
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) { if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
continue; continue;
} }
if (o.all_of<Message::Components::Transfer::TagPaused>()) { if (o.all_of<ObjComp::Ephemeral::File::TagTransferPaused>()) {
continue; continue;
} }
if (!o.get<Components::FT1ChunkSHA1Cache>().have_all) { if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
using Priority = Components::DownloadPriority::Priority; //using Priority = Components::DownloadPriority::Priority;
using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority;
Priority prio = Priority::NORMAL; Priority prio = Priority::NORMAL;
if (o.all_of<Components::DownloadPriority>()) { if (o.all_of<ObjComp::Ephemeral::File::DownloadPriority>()) {
prio = o.get<Components::DownloadPriority>().p; prio = o.get<ObjComp::Ephemeral::File::DownloadPriority>().p;
} }
uint16_t pskips = uint16_t pskips =
prio == Priority::HIGHER ? 0u : prio == Priority::HIGHEST ? 0u :
prio == Priority::HIGH ? 1u : prio == Priority::HIGH ? 1u :
prio == Priority::NORMAL ? 2u : prio == Priority::NORMAL ? 2u :
prio == Priority::LOW ? 4u : prio == Priority::LOW ? 4u :
8u 8u // LOWEST
; ;
participating_unfinished.emplace(o, ParticipationEntry{pskips}); participating_unfinished.emplace(o, ParticipationEntry{pskips});
@ -255,18 +258,19 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
ObjectHandle o {objreg, it->first}; ObjectHandle o {objreg, it->first};
// intersect self have with other have // intersect self have with other have
if (!o.all_of<Components::RemoteHave, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) { if (!o.all_of<Components::RemoteHaveBitset, Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1>()) {
// rare case where no one else has anything // rare case where no one else has anything
continue; continue;
} }
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>(); if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
if (cc.have_all) {
std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n"; std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n";
continue; continue;
} }
const auto& others_have = o.get<Components::RemoteHave>().others; //const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
const auto& others_have = o.get<Components::RemoteHaveBitset>().others;
auto other_it = others_have.find(c); auto other_it = others_have.find(c);
if (other_it == others_have.end()) { if (other_it == others_have.end()) {
// rare case where the other is participating but has nothing // rare case where the other is participating but has nothing
@ -275,7 +279,14 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
const auto& other_have = other_it->second; const auto& other_have = other_it->second;
BitSet chunk_candidates = cc.have_chunk; const auto& info = o.get<Components::FT1InfoSHA1>();
const auto total_chunks = info.chunks.size();
const auto* lhb = o.try_get<ObjComp::F::LocalHaveBitset>();
// if we dont have anything, this might not exist yet
BitSet chunk_candidates = lhb == nullptr ? BitSet{total_chunks} : lhb->have;
if (!other_have.have_all) { if (!other_have.have_all) {
// AND is the same as ~(~A | ~B) // AND is the same as ~(~A | ~B)
// that means we leave chunk_candidates as (have is inverted want) // that means we leave chunk_candidates as (have is inverted want)
@ -288,8 +299,6 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
} else { } else {
chunk_candidates.invert(); chunk_candidates.invert();
} }
const auto& info = o.get<Components::FT1InfoSHA1>();
const auto total_chunks = info.chunks.size();
auto& requested_chunks = o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks; auto& requested_chunks = o.get_or_emplace<Components::FT1ChunkSHA1Requested>().chunks;
// TODO: trim off round up to 8, since they are now always set // TODO: trim off round up to 8, since they are now always set
@ -306,8 +315,8 @@ std::vector<ChunkPicker::ContentChunkR> ChunkPicker::updateChunkRequests(
// TODO: configurable // TODO: configurable
size_t start_offset {0u}; size_t start_offset {0u};
if (o.all_of<Components::ReadHeadHint>()) { if (o.all_of<ObjComp::Ephemeral::File::ReadHeadHint>()) {
const auto byte_offset = o.get<Components::ReadHeadHint>().offset_into_file; const auto byte_offset = o.get<ObjComp::Ephemeral::File::ReadHeadHint>().offset_into_file;
if (byte_offset <= info.file_size) { if (byte_offset <= info.file_size) {
start_offset = byte_offset/info.chunk_size; start_offset = byte_offset/info.chunk_size;
} else { } else {

View File

@ -1,5 +1,7 @@
#include "./components.hpp" #include "./components.hpp"
#include <solanaceae/object_store/meta_components_file.hpp>
namespace Components { namespace Components {
std::vector<size_t> FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const { std::vector<size_t> FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const {
@ -11,14 +13,20 @@ std::vector<size_t> FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) cons
} }
} }
bool FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { bool FT1ChunkSHA1Cache::haveChunk(ObjectHandle o, const SHA1Digest& hash) const {
if (have_all) { // short cut if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
return true; return true;
} }
const auto* lhb = o.try_get<ObjComp::F::LocalHaveBitset>();
if (lhb == nullptr) {
return false; // we dont have anything yet
}
if (auto i_vec = chunkIndices(hash); !i_vec.empty()) { if (auto i_vec = chunkIndices(hash); !i_vec.empty()) {
// TODO: should i test all? // TODO: should i test all?
return have_chunk[i_vec.front()]; //return have_chunk[i_vec.front()];
return lhb->have[i_vec.front()];
} }
// not part of this file // not part of this file

View File

@ -3,6 +3,7 @@
#include <solanaceae/contact/components.hpp> #include <solanaceae/contact/components.hpp>
#include <solanaceae/message3/components.hpp> #include <solanaceae/message3/components.hpp>
#include <solanaceae/message3/registry_message_model.hpp> #include <solanaceae/message3/registry_message_model.hpp>
#include <solanaceae/object_store/meta_components_file.hpp>
#include <solanaceae/util/bitset.hpp> #include <solanaceae/util/bitset.hpp>
@ -35,18 +36,24 @@ namespace Components {
}; };
struct FT1ChunkSHA1Cache { struct FT1ChunkSHA1Cache {
// TODO: extract have_chunk, have_all and have_count to generic comp // TODO: extract have_count to generic comp
// have_chunk is the size of info.chunks.size(), or empty if have_all // have_chunk is the size of info.chunks.size(), or empty if have_all
// keep in mind bitset rounds up to 8s // keep in mind bitset rounds up to 8s
BitSet have_chunk{0}; //BitSet have_chunk{0};
bool have_all {false}; //bool have_all {false};
size_t have_count {0}; size_t have_count {0}; // move?
entt::dense_map<SHA1Digest, std::vector<size_t>> chunk_hash_to_index; entt::dense_map<SHA1Digest, std::vector<size_t>> chunk_hash_to_index;
std::vector<size_t> chunkIndices(const SHA1Digest& hash) const; std::vector<size_t> chunkIndices(const SHA1Digest& hash) const;
bool haveChunk(const SHA1Digest& hash) const; bool haveChunk(ObjectHandle o, const SHA1Digest& hash) const;
};
struct FT1File2 {
// the cached file2 for faster access
// should be destroyed when no activity and recreated on demand
std::unique_ptr<File2I> file;
}; };
struct FT1ChunkSHA1Requested { struct FT1ChunkSHA1Requested {
@ -63,7 +70,7 @@ namespace Components {
entt::dense_set<Contact3> participants; entt::dense_set<Contact3> participants;
}; };
struct RemoteHave { struct RemoteHaveBitset {
struct Entry { struct Entry {
bool have_all {false}; bool have_all {false};
BitSet have; BitSet have;
@ -92,41 +99,8 @@ namespace Components {
void lower(void); void lower(void);
}; };
struct DownloadPriority {
// download/retreival priority in comparison to other objects
// not all backends implement this
// priority can be weak, meaning low priority dls will still get transfer activity, just less often
enum class Priority {
HIGHER,
HIGH,
NORMAL,
LOW,
LOWER,
} p = Priority::NORMAL;
};
struct ReadHeadHint {
// points to the first byte we want
// this is just a hint, that can be set from outside
// to guide the sequential "piece picker" strategy
// ? the strategy *should* set this to the first byte we dont yet have
uint64_t offset_into_file {0u};
};
// this is per object/content
// more aplicable than "separated", so should be supported by most backends
struct TransferStats {
// in bytes per second
float rate_up {0.f};
float rate_down {0.f};
// bytes
uint64_t total_up {0u};
uint64_t total_down {0u};
};
struct TransferStatsSeparated { struct TransferStatsSeparated {
entt::dense_map<Contact3, TransferStats> stats; entt::dense_map<Contact3, ObjComp::Ephemeral::File::TransferStats> stats;
}; };
// used to populate stats // used to populate stats

View File

@ -1,7 +1,7 @@
#include "./re_announce_systems.hpp" #include "./re_announce_systems.hpp"
#include "./components.hpp" #include "./components.hpp"
#include <solanaceae/message3/components.hpp> #include <solanaceae/object_store/meta_components_file.hpp>
#include <solanaceae/tox_contacts/components.hpp> #include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp> #include <solanaceae/ngc_ft1/ngcft1_file_kind.hpp>
#include <vector> #include <vector>
@ -18,11 +18,12 @@ void re_announce(
std::vector<Object> to_remove; std::vector<Object> to_remove;
os_reg.view<Components::ReAnnounceTimer>().each([&os_reg, &cr, &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) { os_reg.view<Components::ReAnnounceTimer>().each([&os_reg, &cr, &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) {
ObjectHandle o{os_reg, ov}; ObjectHandle o{os_reg, ov};
// if paused -> remove // TODO: pause
if (o.all_of<Message::Components::Transfer::TagPaused>()) { //// if paused -> remove
to_remove.push_back(ov); //if (o.all_of<Message::Components::Transfer::TagPaused>()) {
return; // to_remove.push_back(ov);
} // return;
//}
// if not downloading or info incomplete -> remove // if not downloading or info incomplete -> remove
if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) { if (!o.all_of<Components::FT1ChunkSHA1Cache, Components::FT1InfoSHA1Hash, Components::AnnounceTargets>()) {
@ -31,7 +32,7 @@ void re_announce(
return; return;
} }
if (o.get<Components::FT1ChunkSHA1Cache>().have_all) { if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
// transfer done, we stop announcing // transfer done, we stop announcing
to_remove.push_back(ov); to_remove.push_back(ov);
return; return;

View File

@ -6,6 +6,7 @@
#include <solanaceae/tox_contacts/components.hpp> #include <solanaceae/tox_contacts/components.hpp>
#include <solanaceae/message3/components.hpp> #include <solanaceae/message3/components.hpp>
#include <solanaceae/tox_messages/components.hpp> #include <solanaceae/tox_messages/components.hpp>
#include <solanaceae/object_store/meta_components_file.hpp>
#include "./util.hpp" #include "./util.hpp"
@ -31,12 +32,6 @@
#include <filesystem> #include <filesystem>
#include <vector> #include <vector>
namespace Message::Components {
using Content = ObjectHandle;
} // Message::Components
static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) {
if (chunk_index+1 == sha1_info.chunks.size()) { if (chunk_index+1 == sha1_info.chunks.size()) {
// last chunk // last chunk
@ -70,30 +65,15 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe
_queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, obj, hash, 0.f)); _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, obj, hash, 0.f));
} }
void SHA1_NGCFT1::updateMessages(ObjectHandle ce) { void SHA1_NGCFT1::updateMessages(ObjectHandle o) {
assert(ce.all_of<Components::Messages>()); assert(o.all_of<Components::Messages>());
for (auto msg : ce.get<Components::Messages>().messages) { for (auto msg : o.get<Components::Messages>().messages) {
if (ce.all_of<Message::Components::Transfer::FileInfo>() && !msg.all_of<Message::Components::Transfer::FileInfo>()) { msg.emplace_or_replace<Message::Components::MessageFileObject>(o);
msg.emplace<Message::Components::Transfer::FileInfo>(ce.get<Message::Components::Transfer::FileInfo>());
} // messages no long hold this info
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) { // this should not update messages anymore but simply just update the object
msg.emplace_or_replace<Message::Components::Transfer::FileInfoLocal>(ce.get<Message::Components::Transfer::FileInfoLocal>()); // and receivers should listen for object updates (?)
}
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
msg.emplace_or_replace<Message::Components::Transfer::BytesSent>(ce.get<Message::Components::Transfer::BytesSent>());
}
if (ce.all_of<Message::Components::Transfer::BytesReceived>()) {
msg.emplace_or_replace<Message::Components::Transfer::BytesReceived>(ce.get<Message::Components::Transfer::BytesReceived>());
}
if (ce.all_of<Message::Components::Transfer::TagPaused>()) {
msg.emplace_or_replace<Message::Components::Transfer::TagPaused>();
} else {
msg.remove<Message::Components::Transfer::TagPaused>();
}
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
msg.emplace_or_replace<Message::Components::Transfer::TagHaveAll>();
}
_rmm.throwEventUpdate(msg); _rmm.throwEventUpdate(msg);
} }
@ -177,6 +157,40 @@ void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) {
_queue_send_bitset.push_back(QBitsetEntry{c, o}); _queue_send_bitset.push_back(QBitsetEntry{c, o});
} }
File2I* SHA1_NGCFT1::objGetFile2Write(ObjectHandle o) {
auto* file2_comp_ptr = o.try_get<Components::FT1File2>();
if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_write || !file2_comp_ptr->file->isGood()) {
// (re)request file2 from backend
auto new_file = _mfb.file2(o, StorageBackendI::FILE2_WRITE);
if (!new_file || !new_file->can_write || !new_file->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed to open object for writing\n";
return nullptr; // early out
}
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file));
}
assert(file2_comp_ptr != nullptr);
assert(static_cast<bool>(file2_comp_ptr->file));
return file2_comp_ptr->file.get();
}
File2I* SHA1_NGCFT1::objGetFile2Read(ObjectHandle o) {
auto* file2_comp_ptr = o.try_get<Components::FT1File2>();
if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_read || !file2_comp_ptr->file->isGood()) {
// (re)request file2 from backend
auto new_file = _mfb.file2(o, StorageBackendI::FILE2_READ);
if (!new_file || !new_file->can_read || !new_file->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed to open object for reading\n";
return nullptr; // early out
}
file2_comp_ptr = &o.emplace_or_replace<Components::FT1File2>(std::move(new_file));
}
assert(file2_comp_ptr != nullptr);
assert(static_cast<bool>(file2_comp_ptr->file));
return file2_comp_ptr->file.get();
}
SHA1_NGCFT1::SHA1_NGCFT1( SHA1_NGCFT1::SHA1_NGCFT1(
ObjectStore2& os, ObjectStore2& os,
Contact3Registry& cr, Contact3Registry& cr,
@ -196,9 +210,9 @@ SHA1_NGCFT1::SHA1_NGCFT1(
_mfb(os) _mfb(os)
{ {
// TODO: also create and destroy // TODO: also create and destroy
_rmm.subscribe(this, RegistryMessageModel_Event::message_updated); //_os.subscribe(this, ObjectStore_Event::object_construct);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_construct); _os.subscribe(this, ObjectStore_Event::object_update);
//_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); //_os.subscribe(this, ObjectStore_Event::object_destroy);
_nft.subscribe(this, NGCFT1_Event::recv_request); _nft.subscribe(this, NGCFT1_Event::recv_request);
_nft.subscribe(this, NGCFT1_Event::recv_init); _nft.subscribe(this, NGCFT1_Event::recv_init);
@ -289,20 +303,19 @@ float SHA1_NGCFT1::iterate(float delta) {
if (static_cast<bool>(qe.o) && static_cast<bool>(qe.c) && qe.c.all_of<Contact::Components::ToxGroupPeerEphemeral>() && qe.o.all_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Hash, Components::FT1ChunkSHA1Cache>()) { if (static_cast<bool>(qe.o) && static_cast<bool>(qe.c) && qe.c.all_of<Contact::Components::ToxGroupPeerEphemeral>() && qe.o.all_of<Components::FT1InfoSHA1, Components::FT1InfoSHA1Hash, Components::FT1ChunkSHA1Cache>()) {
const auto [group_number, peer_number] = qe.c.get<Contact::Components::ToxGroupPeerEphemeral>(); const auto [group_number, peer_number] = qe.c.get<Contact::Components::ToxGroupPeerEphemeral>();
const auto& info_hash = qe.o.get<Components::FT1InfoSHA1Hash>().hash; const auto& info_hash = qe.o.get<Components::FT1InfoSHA1Hash>().hash;
const auto& cc = qe.o.get<Components::FT1ChunkSHA1Cache>();
const auto& info = qe.o.get<Components::FT1InfoSHA1>(); const auto& info = qe.o.get<Components::FT1InfoSHA1>();
const auto total_chunks = info.chunks.size(); const auto total_chunks = info.chunks.size();
static constexpr size_t bits_per_packet {8u*512u}; static constexpr size_t bits_per_packet {8u*512u};
if (cc.have_all) { if (qe.o.all_of<ObjComp::F::TagLocalHaveAll>()) {
// send have all // send have all
_neep.send_ft1_have_all( _neep.send_ft1_have_all(
group_number, peer_number, group_number, peer_number,
static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO), static_cast<uint32_t>(NGCFT1_file_kind::HASH_SHA1_INFO),
info_hash.data(), info_hash.size() info_hash.data(), info_hash.size()
); );
} else { } else if (const auto* lhb = qe.o.try_get<ObjComp::F::LocalHaveBitset>(); lhb != nullptr) {
for (size_t i = 0; i < total_chunks; i += bits_per_packet) { for (size_t i = 0; i < total_chunks; i += bits_per_packet) {
size_t bits_this_packet = std::min<size_t>(bits_per_packet, total_chunks-i); size_t bits_this_packet = std::min<size_t>(bits_per_packet, total_chunks-i);
@ -310,7 +323,7 @@ float SHA1_NGCFT1::iterate(float delta) {
// TODO: optimize selective copy bitset // TODO: optimize selective copy bitset
for (size_t j = i; j < i+bits_this_packet; j++) { for (size_t j = i; j < i+bits_this_packet; j++) {
if (cc.have_chunk[j]) { if (lhb->have[j]) {
have.set(j-i); have.set(j-i);
} }
} }
@ -324,7 +337,7 @@ float SHA1_NGCFT1::iterate(float delta) {
have._bytes.data(), have.size_bytes() have._bytes.data(), have.size_bytes()
); );
} }
} } // else, we have nothing *shrug*
} }
_queue_send_bitset.pop_front(); _queue_send_bitset.pop_front();
@ -466,6 +479,9 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
} }
} }
// in both cases, private and public, c (contact to) is the target
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c);
// create message // create message
const auto c_self = _cr.get<Contact::Components::Self>(c).self; const auto c_self = _cr.get<Contact::Components::Self>(c).self;
if (!_cr.valid(c_self)) { if (!_cr.valid(c_self)) {
@ -479,8 +495,9 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive? reg_ptr->emplace<Message::Components::Timestamp>(msg_e, ts); // reactive?
reg_ptr->emplace<Message::Components::Read>(msg_e, ts); reg_ptr->emplace<Message::Components::Read>(msg_e, ts);
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(msg_e); reg_ptr->emplace<Message::Components::MessageFileObject>(msg_e, o);
reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
//reg_ptr->emplace<Message::Components::Transfer::TagSending>(msg_e);
o.get_or_emplace<Components::Messages>().messages.push_back({*reg_ptr, msg_e}); o.get_or_emplace<Components::Messages>().messages.push_back({*reg_ptr, msg_e});
@ -510,29 +527,27 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p
_rmm.throwEventConstruct(*reg_ptr, msg_e); _rmm.throwEventConstruct(*reg_ptr, msg_e);
// TODO: place in iterate? // TODO: place in iterate?
updateMessages(o); updateMessages(o); // nop // TODO: remove
} }
bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) {
// see tox_transfer_manager.cpp for reference if (!e.e.all_of<ObjComp::Ephemeral::File::ActionTransferAccept>()) {
if (!e.e.all_of<Message::Components::Transfer::ActionAccept, Message::Components::Content>()) {
return false; return false;
} }
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path); if (!e.e.all_of<Components::FT1InfoSHA1>()) {
auto ce = e.e.get<Message::Components::Content>();
//if (!ce.all_of<Components::FT1InfoSHA1, Components::FT1ChunkSHA1Cache>()) {
if (!ce.all_of<Components::FT1InfoSHA1>()) {
// not ready to load yet, skip // not ready to load yet, skip
return false; return false;
} }
assert(!ce.all_of<Components::FT1ChunkSHA1Cache>()); assert(!e.e.all_of<ObjComp::F::TagLocalHaveAll>());
assert(!ce.all_of<Message::Components::Transfer::File>()); assert(!e.e.all_of<Components::FT1ChunkSHA1Cache>());
assert(!e.e.all_of<Components::FT1File2>());
//accept(e.e, e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path);
// first, open file for write(+readback) // first, open file for write(+readback)
std::string full_file_path{e.e.get<Message::Components::Transfer::ActionAccept>().save_to_path}; std::string full_file_path{e.e.get<ObjComp::Ephemeral::File::ActionTransferAccept>().save_to_path};
// TODO: replace with filesystem or something // TODO: replace with filesystem or something
// TODO: use bool in action !!!
if (full_file_path.back() != '/') { if (full_file_path.back() != '/') {
full_file_path += "/"; full_file_path += "/";
} }
@ -540,10 +555,10 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// ensure dir exists // ensure dir exists
std::filesystem::create_directories(full_file_path); std::filesystem::create_directories(full_file_path);
const auto& info = ce.get<Components::FT1InfoSHA1>(); const auto& info = e.e.get<Components::FT1InfoSHA1>();
full_file_path += info.file_name; full_file_path += info.file_name;
ce.emplace<Message::Components::Transfer::FileInfoLocal>(std::vector{full_file_path}); e.e.emplace<ObjComp::F::SingleInfoLocal>(full_file_path);
const bool file_exists = std::filesystem::exists(full_file_path); const bool file_exists = std::filesystem::exists(full_file_path);
std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(full_file_path, info.file_size); std::unique_ptr<File2I> file_impl = construct_file2_rw_mapped(full_file_path, info.file_size);
@ -551,15 +566,17 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
if (!file_impl->isGood()) { if (!file_impl->isGood()) {
std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n"; std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n";
// we failed opening that filepath, so we should offer the user the oportunity to save it differently // we failed opening that filepath, so we should offer the user the oportunity to save it differently
e.e.remove<Message::Components::Transfer::ActionAccept>(); // stop e.e.remove<ObjComp::Ephemeral::File::ActionTransferAccept>(); // stop
return false; return false;
} }
{ // next, create chuck cache and check for existing data { // next, create chuck cache and check for existing data
auto& cc = ce.emplace<Components::FT1ChunkSHA1Cache>(); auto& transfer_stats = e.e.get_or_emplace<ObjComp::Ephemeral::File::TransferStats>();
auto& bytes_received = ce.get_or_emplace<Message::Components::Transfer::BytesReceived>().total; auto& lhb = e.e.get_or_emplace<ObjComp::F::LocalHaveBitset>();
cc.have_chunk = BitSet(info.chunks.size()); if (lhb.have.size_bytes() < info.chunks.size()/8) {
cc.have_all = false; lhb.have = BitSet{info.chunks.size()};
}
auto& cc = e.e.emplace<Components::FT1ChunkSHA1Cache>();
cc.have_count = 0; cc.have_count = 0;
cc.chunk_hash_to_index.clear(); // if copy pasta cc.chunk_hash_to_index.clear(); // if copy pasta
@ -576,9 +593,12 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
const bool data_equal = data_hash == info.chunks.at(i); const bool data_equal = data_hash == info.chunks.at(i);
if (data_equal) { if (data_equal) {
cc.have_chunk.set(i); lhb.have.set(i);
cc.have_count += 1; cc.have_count += 1;
bytes_received += chunk_size;
// TODO: replace with some progress counter?
// or move have_count/want_count or something?
transfer_stats.total_down += chunk_size;
//std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n"; //std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n";
} else { } else {
//std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n"; //std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n";
@ -587,59 +607,45 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) {
// error reading? // error reading?
} }
_chunks[info.chunks[i]] = ce; _chunks[info.chunks[i]] = e.e;
cc.chunk_hash_to_index[info.chunks[i]].push_back(i); cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
} }
std::cout << "preexisting " << cc.have_count << "/" << info.chunks.size() << "\n"; std::cout << "preexisting " << cc.have_count << "/" << info.chunks.size() << "\n";
if (cc.have_count >= info.chunks.size()) { if (cc.have_count >= info.chunks.size()) {
cc.have_all = true; e.e.emplace_or_replace<ObjComp::F::TagLocalHaveAll>();
//ce.remove<Message::Components::Transfer::BytesReceived>(); e.e.remove<ObjComp::F::LocalHaveBitset>();
} }
} else { } else {
for (size_t i = 0; i < info.chunks.size(); i++) { for (size_t i = 0; i < info.chunks.size(); i++) {
_chunks[info.chunks[i]] = ce; _chunks[info.chunks[i]] = e.e;
cc.chunk_hash_to_index[info.chunks[i]].push_back(i); cc.chunk_hash_to_index[info.chunks[i]].push_back(i);
} }
} }
} }
ce.emplace<Message::Components::Transfer::File>(std::move(file_impl)); e.e.emplace_or_replace<Components::FT1File2>(std::move(file_impl));
// queue announce we are participating // queue announce that we are participating
// since this is the first time, we publicly announce to all e.e.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
if (e.e.all_of<Message::Components::ContactFrom, Message::Components::ContactTo>()) {
const auto c_f = e.e.get<Message::Components::ContactFrom>().c;
const auto c_t = e.e.get<Message::Components::ContactTo>().c;
if (_cr.all_of<Contact::Components::ToxGroupEphemeral>(c_t)) { e.e.remove<ObjComp::Ephemeral::File::TagTransferPaused>();
// public
ce.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c_t);
} else if (_cr.all_of<Contact::Components::ToxGroupPeerEphemeral>(c_f)) {
// private ?
ce.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c_f);
}
}
ce.get_or_emplace<Components::ReAnnounceTimer>(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f;
ce.remove<Message::Components::Transfer::TagPaused>();
// start requesting from all participants // start requesting from all participants
if (ce.all_of<Components::SuspectedParticipants>()) { if (e.e.all_of<Components::SuspectedParticipants>()) {
std::cout << "accepted ft has " << ce.get<Components::SuspectedParticipants>().participants.size() << " sp\n"; std::cout << "accepted ft has " << e.e.get<Components::SuspectedParticipants>().participants.size() << " sp\n";
for (const auto cv : ce.get<Components::SuspectedParticipants>().participants) { for (const auto cv : e.e.get<Components::SuspectedParticipants>().participants) {
_cr.emplace_or_replace<ChunkPickerUpdateTag>(cv); _cr.emplace_or_replace<ChunkPickerUpdateTag>(cv);
} }
} else { } else {
std::cout << "accepted ft has NO sp!\n"; std::cout << "accepted ft has NO sp!\n";
} }
// should? e.e.remove<ObjComp::Ephemeral::File::ActionTransferAccept>();
e.e.remove<Message::Components::Transfer::ActionAccept>();
updateMessages(ce); updateMessages(e.e);
return false; return false; // ?
} }
bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
@ -717,7 +723,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) {
assert(o.all_of<Components::FT1ChunkSHA1Cache>()); assert(o.all_of<Components::FT1ChunkSHA1Cache>());
if (!o.get<Components::FT1ChunkSHA1Cache>().haveChunk(chunk_hash)) { if (!o.get<Components::FT1ChunkSHA1Cache>().haveChunk(o, chunk_hash)) {
// we dont have the chunk // we dont have the chunk
return false; return false;
} }
@ -793,7 +799,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) {
assert(o.all_of<Components::FT1ChunkSHA1Cache>()); assert(o.all_of<Components::FT1ChunkSHA1Cache>());
const auto& cc = o.get<Components::FT1ChunkSHA1Cache>(); const auto& cc = o.get<Components::FT1ChunkSHA1Cache>();
if (cc.haveChunk(sha1_chunk_hash)) { if (cc.haveChunk(o, sha1_chunk_hash)) {
std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n"; std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n";
// we have the chunk // we have the chunk
return false; return false;
@ -854,16 +860,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) {
} else if (transfer.isChunk()) { } else if (transfer.isChunk()) {
auto o = transfer.getChunk().content; auto o = transfer.getChunk().content;
assert(o.all_of<Message::Components::Transfer::File>());
auto* file = o.get<Message::Components::Transfer::File>().get();
assert(file != nullptr);
const auto chunk_size = o.get<Components::FT1InfoSHA1>().chunk_size; const auto chunk_size = o.get<Components::FT1InfoSHA1>().chunk_size;
for (const auto chunk_index : transfer.getChunk().chunk_indices) { for (const auto chunk_index : transfer.getChunk().chunk_indices) {
const auto offset_into_file = chunk_index * chunk_size; const auto offset_into_file = chunk_index * chunk_size;
if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) { auto* file2 = objGetFile2Write(o);
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n"; if (file2 == nullptr) {
return false; // early out
}
if (!file2->write({e.data, e.data_size}, offset_into_file + e.data_offset)) {
std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << entt::to_integral(o.entity()) << "@" << offset_into_file + e.data_offset << "\n";
} }
} }
@ -903,8 +909,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
} else if (transfer.isChunk()) { } else if (transfer.isChunk()) {
auto& chunk_transfer = transfer.getChunk(); auto& chunk_transfer = transfer.getChunk();
const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>(); const auto& info = chunk_transfer.content.get<Components::FT1InfoSHA1>();
// TODO: should we really use file?
const auto data = chunk_transfer.content.get<Message::Components::Transfer::File>()->read( auto* file2 = objGetFile2Read(chunk_transfer.content);
if (file2 == nullptr) {
// return true?
return false; // early out
}
const auto data = file2->read(
e.data_size, e.data_size,
(chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset (chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset
); );
@ -914,7 +926,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) {
e.data[i] = data[i]; e.data[i] = data[i];
} }
chunk_transfer.content.get_or_emplace<Message::Components::Transfer::BytesSent>().total += data.size;
// TODO: add event to propergate to messages // TODO: add event to propergate to messages
//_rmm.throwEventUpdate(transfer); // should we? //_rmm.throwEventUpdate(transfer); // should we?
@ -972,9 +983,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
{ // file info { // file info
// TODO: not overwrite fi? since same? // TODO: not overwrite fi? since same?
auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>(); auto& file_info = o.emplace_or_replace<ObjComp::F::SingleInfo>(ft_info.file_name, ft_info.file_size);
file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; //auto& file_info = o.emplace_or_replace<Message::Components::Transfer::FileInfo>();
file_info.total_size = ft_info.file_size; //file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size};
//file_info.total_size = ft_info.file_size;
} }
std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n"; std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n";
@ -984,7 +996,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
_queue_content_want_info.erase(it); _queue_content_want_info.erase(it);
} }
o.emplace_or_replace<Message::Components::Transfer::TagPaused>(); o.emplace_or_replace<ObjComp::Ephemeral::File::TagTransferPaused>();
updateMessages(o); updateMessages(o);
} else if (transfer.isChunk()) { } else if (transfer.isChunk()) {
@ -1000,7 +1012,12 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
const auto chunk_size = info.chunkSize(chunk_index); const auto chunk_size = info.chunkSize(chunk_index);
assert(offset_into_file+chunk_size <= info.file_size); assert(offset_into_file+chunk_size <= info.file_size);
const auto chunk_data = o.get<Message::Components::Transfer::File>()->read(chunk_size, offset_into_file); auto* file2 = objGetFile2Read(o);
if (file2 == nullptr) {
// rip
return false;
}
auto chunk_data = std::move(file2->read(chunk_size, offset_into_file));
assert(!chunk_data.empty()); assert(!chunk_data.empty());
// check hash of chunk // check hash of chunk
@ -1008,35 +1025,43 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
if (info.chunks.at(chunk_index) == got_hash) { if (info.chunks.at(chunk_index) == got_hash) {
std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n";
if (!cc.have_all) { if (!o.all_of<ObjComp::F::TagLocalHaveAll>()) {
for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) { {
if (!cc.have_all && !cc.have_chunk[inner_chunk_index]) { auto& lhb = o.get_or_emplace<ObjComp::F::LocalHaveBitset>(info.chunks.size());
cc.have_chunk.set(inner_chunk_index); for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) {
if (lhb.have[inner_chunk_index]) {
continue;
}
// new good chunk
lhb.have.set(inner_chunk_index);
cc.have_count += 1; cc.have_count += 1;
// TODO: have wasted + metadata
//o.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
// we already tallied all of them but maybe we want to set some other progress indicator here?
if (cc.have_count == info.chunks.size()) { if (cc.have_count == info.chunks.size()) {
// debug check // debug check
for ([[maybe_unused]] size_t i = 0; i < info.chunks.size(); i++) { for ([[maybe_unused]] size_t i = 0; i < info.chunks.size(); i++) {
assert(cc.have_chunk[i]); assert(lhb.have[i]);
} }
cc.have_all = true; o.emplace_or_replace<ObjComp::F::TagLocalHaveAll>();
cc.have_chunk = BitSet(0); // not wasting memory
std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n";
// HACK: remap file, to clear ram // HACK: close file2, to clear ram
// TODO: just add a lastActivity comp and close files every x minutes based on that
// TODO: error checking file2 = nullptr; // making sure we dont have a stale ptr
o.get<Message::Components::Transfer::File>() = construct_file2_rw_mapped( o.remove<Components::FT1File2>(); // will be recreated on demand
o.get<Message::Components::Transfer::FileInfoLocal>().file_list.front(), break;
info.file_size
);
} }
// good chunk
// TODO: have wasted + metadata
o.get_or_emplace<Message::Components::Transfer::BytesReceived>().total += chunk_data.size;
} }
} }
if (o.all_of<ObjComp::F::TagLocalHaveAll>()) {
o.remove<ObjComp::F::LocalHaveBitset>(); // save space
}
// queue chunk have for all participants // queue chunk have for all participants
// HACK: send immediatly to all participants // HACK: send immediatly to all participants
@ -1062,20 +1087,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) {
chunk_indices.data(), chunk_indices.size() chunk_indices.data(), chunk_indices.size()
); );
} }
#if 0
if (!cc.have_all) { // debug print self have set
std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits();
for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) {
if (i % 32 == 0) {
printf("\n");
}
// f cout
printf("%.2x", (uint16_t)cc.have_chunk.data()[i]);
}
printf("\n");
}
#endif
} else { } else {
std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n";
} }
@ -1125,7 +1136,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
return false; return false;
} }
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); uint64_t ts = Message::getTimeMS();
const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number);
_tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround
@ -1152,7 +1163,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
reg.emplace<Message::Components::ToxGroupMessageID>(new_msg_e, e.message_id); reg.emplace<Message::Components::ToxGroupMessageID>(new_msg_e, e.message_id);
reg.emplace<Message::Components::Transfer::TagReceiving>(new_msg_e); // add sending? //reg.emplace<Message::Components::Transfer::TagReceiving>(new_msg_e); // add sending?
reg.emplace<Message::Components::TimestampProcessed>(new_msg_e, ts); reg.emplace<Message::Components::TimestampProcessed>(new_msg_e, ts);
//reg.emplace<Components::TimestampWritten>(new_msg_e, 0); //reg.emplace<Components::TimestampWritten>(new_msg_e, 0);
@ -1174,58 +1185,39 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) {
// check if content exists // check if content exists
const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size}; const auto sha1_info_hash = std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size};
ObjectHandle ce; ObjectHandle o;
if (_info_to_content.count(sha1_info_hash)) { if (_info_to_content.count(sha1_info_hash)) {
ce = _info_to_content.at(sha1_info_hash); o = _info_to_content.at(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has existing content\n"; std::cout << "SHA1_NGCFT1: new message has existing content\n";
} else { } else {
// TODO: backend // TODO: backend
ce = {_os.registry(), _os.registry().create()}; o = _mfb.newObject(ByteSpan{sha1_info_hash});
_info_to_content[sha1_info_hash] = ce; _info_to_content[sha1_info_hash] = o;
o.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
std::cout << "SHA1_NGCFT1: new message has new content\n"; std::cout << "SHA1_NGCFT1: new message has new content\n";
//ce.emplace<Components::FT1InfoSHA1>(sha1_info);
//ce.emplace<Components::FT1InfoSHA1Data>(sha1_info_data); // keep around? or file?
ce.emplace<Components::FT1InfoSHA1Hash>(sha1_info_hash);
} }
ce.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e}); o.get_or_emplace<Components::Messages>().messages.push_back({reg, new_msg_e});
reg_ptr->emplace<Message::Components::Content>(new_msg_e, ce); reg_ptr->emplace<Message::Components::MessageFileObject>(new_msg_e, o);
// HACK: assume the message sender is participating. usually a safe bet. // HACK: assume the message sender is participating. usually a safe bet.
if (addParticipation(c, ce)) { if (addParticipation(c, o)) {
// something happend, update chunk picker // something happend, update chunk picker
assert(static_cast<bool>(c)); assert(static_cast<bool>(c));
c.emplace_or_replace<ChunkPickerUpdateTag>(); c.emplace_or_replace<ChunkPickerUpdateTag>();
} }
// HACK: assume the message sender has all // HACK: assume the message sender has all
ce.get_or_emplace<Components::RemoteHave>().others[c] = {true, {}}; o.get_or_emplace<Components::RemoteHaveBitset>().others[c] = {true, {}};
if (!ce.all_of<Components::ReRequestInfoTimer>() && !ce.all_of<Components::FT1InfoSHA1>()) { if (!o.all_of<Components::ReRequestInfoTimer>() && !o.all_of<Components::FT1InfoSHA1>()) {
// TODO: check if already receiving // TODO: check if already receiving
_queue_content_want_info.push_back(ce); _queue_content_want_info.push_back(o);
} }
// since public
o.get_or_emplace<Components::AnnounceTargets>().targets.emplace(c.get<Contact::Components::Parent>().parent);
// TODO: queue info dl // TODO: queue info dl
//reg_ptr->emplace<Components::FT1InfoSHA1>(e, sha1_info);
//reg_ptr->emplace<Components::FT1InfoSHA1Data>(e, sha1_info_data); // keep around? or file?
//reg.emplace<Components::FT1InfoSHA1Hash>(new_msg_e, std::vector<uint8_t>{e.file_id, e.file_id+e.file_id_size});
if (auto* cc = ce.try_get<Components::FT1ChunkSHA1Cache>(); cc != nullptr && cc->have_all) {
reg_ptr->emplace<Message::Components::Transfer::TagHaveAll>(new_msg_e);
}
if (ce.all_of<Message::Components::Transfer::FileInfo>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfo>(new_msg_e, ce.get<Message::Components::Transfer::FileInfo>());
}
if (ce.all_of<Message::Components::Transfer::FileInfoLocal>()) {
reg_ptr->emplace<Message::Components::Transfer::FileInfoLocal>(new_msg_e, ce.get<Message::Components::Transfer::FileInfoLocal>());
}
if (ce.all_of<Message::Components::Transfer::BytesSent>()) {
reg_ptr->emplace<Message::Components::Transfer::BytesSent>(new_msg_e, ce.get<Message::Components::Transfer::BytesSent>());
}
// TODO: queue info/check if we already have info // TODO: queue info/check if we already have info
_rmm.throwEventConstruct(reg, new_msg_e); _rmm.throwEventConstruct(reg, new_msg_e);
@ -1249,7 +1241,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std
} }
// get current time unix epoch utc // get current time unix epoch utc
uint64_t ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); uint64_t ts = Message::getTimeMS();
_mfb.newFromFile( _mfb.newFromFile(
file_name, file_path, file_name, file_path,
@ -1301,8 +1293,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) {
for (const auto& [_, o] : _info_to_content) { for (const auto& [_, o] : _info_to_content) {
removeParticipation(c, o); removeParticipation(c, o);
if (o.all_of<Components::RemoteHave>()) { if (o.all_of<Components::RemoteHaveBitset>()) {
o.get<Components::RemoteHave>().others.erase(c); o.get<Components::RemoteHaveBitset>().others.erase(c);
} }
} }
} }
@ -1356,10 +1348,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) {
c.emplace_or_replace<ChunkPickerUpdateTag>(); c.emplace_or_replace<ChunkPickerUpdateTag>();
} }
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others; auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) { if (!remote_have.contains(c)) {
// init // init
remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks}); remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
// new have? nice // new have? nice
// (always update on biset, not always on have) // (always update on biset, not always on have)
@ -1443,10 +1435,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) {
// we might not know yet // we might not know yet
addParticipation(c, o); addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others; auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
if (!remote_have.contains(c)) { if (!remote_have.contains(c)) {
// init // init
remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks}); remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks});
} }
auto& remote_have_peer = remote_have.at(c); auto& remote_have_peer = remote_have.at(c);
@ -1507,8 +1499,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) {
// we might not know yet // we might not know yet
addParticipation(c, o); addParticipation(c, o);
auto& remote_have = o.get_or_emplace<Components::RemoteHave>().others; auto& remote_have = o.get_or_emplace<Components::RemoteHaveBitset>().others;
remote_have[c] = Components::RemoteHave::Entry{true, {}}; remote_have[c] = Components::RemoteHaveBitset::Entry{true, {}};
// new have? nice // new have? nice
// (always update on have_all, not always on have) // (always update on have_all, not always on have)

View File

@ -20,7 +20,7 @@
#include <random> #include <random>
#include <chrono> #include <chrono>
class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public NGCFT1EventI, public NGCEXTEventI { class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public ObjectStoreEventI, public NGCFT1EventI, public NGCEXTEventI {
ObjectStore2& _os; ObjectStore2& _os;
// TODO: backend abstraction // TODO: backend abstraction
Contact3Registry& _cr; Contact3Registry& _cr;
@ -75,6 +75,9 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
void queueBitsetSendFull(Contact3Handle c, ObjectHandle o); void queueBitsetSendFull(Contact3Handle c, ObjectHandle o);
File2I* objGetFile2Write(ObjectHandle o);
File2I* objGetFile2Read(ObjectHandle o);
public: // TODO: config public: // TODO: config
bool _udp_only {false}; bool _udp_only {false};
@ -97,7 +100,10 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts); void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts);
protected: // rmm events (actions) protected: // rmm events (actions)
bool onEvent(const Message::Events::MessageUpdated&) override; bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;
protected: // os events (actions)
bool onEvent(const ObjectStore::Events::ObjectUpdate&) override;
protected: // events protected: // events
bool onEvent(const Events::NGCFT1_recv_request&) override; bool onEvent(const Events::NGCFT1_recv_request&) override;
@ -108,8 +114,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public
bool onEvent(const Events::NGCFT1_send_done&) override; bool onEvent(const Events::NGCFT1_send_done&) override;
bool onEvent(const Events::NGCFT1_recv_message&) override; bool onEvent(const Events::NGCFT1_recv_message&) override;
bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override;
bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override; bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override;
bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override; bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override;

View File

@ -1,6 +1,8 @@
#include "./transfer_stats_systems.hpp" #include "./transfer_stats_systems.hpp"
#include "./components.hpp" #include "./components.hpp"
#include <solanaceae/object_store/meta_components_file.hpp>
#include <iostream> #include <iostream>
namespace Systems { namespace Systems {
@ -88,7 +90,7 @@ void transfer_tally_update(ObjectRegistry& os_reg, const float time_now) {
// for each stats separated -> stats (total) // for each stats separated -> stats (total)
os_reg.view<Components::TransferStatsSeparated, Components::TransferStatsTally>().each([&os_reg](const auto ov, Components::TransferStatsSeparated& tss_comp, const auto&) { os_reg.view<Components::TransferStatsSeparated, Components::TransferStatsTally>().each([&os_reg](const auto ov, Components::TransferStatsSeparated& tss_comp, const auto&) {
Components::TransferStats& stats = os_reg.get_or_emplace<Components::TransferStats>(ov); auto& stats = os_reg.get_or_emplace<ObjComp::Ephemeral::File::TransferStats>(ov);
stats = {}; // reset stats = {}; // reset
for (const auto& [_, peer_stats] : tss_comp.stats) { for (const auto& [_, peer_stats] : tss_comp.stats) {