diff --git a/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp b/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp index b652921..3daed84 100644 --- a/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp +++ b/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.cpp @@ -1,6 +1,7 @@ #include "./sha1_mapped_filesystem.hpp" #include +#include #include "../file_constructor.hpp" #include "../ft1_sha1_info.hpp" @@ -138,10 +139,7 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v o = {_os.registry(), it_ov}; } } - //if (self->info_to_content.count(sha1_info_hash)) { if (static_cast(o)) { - //ce = self->info_to_content.at(sha1_info_hash); - // TODO: check if content is incomplete and use file instead if (!o.all_of()) { o.emplace(sha1_info); @@ -153,32 +151,9 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v // hash has to be set already // Components::FT1InfoSHA1Hash - { // lookup tables and have - auto& cc = o.get_or_emplace(); - cc.have_all = true; - // skip have vec, since all - //cc.have_chunk - cc.have_count = sha1_info.chunks.size(); // need? - - //self->_info_to_content[sha1_info_hash] = ce; - cc.chunk_hash_to_index.clear(); // for cpy pst - for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - //self->_chunks[sha1_info.chunks[i]] = ce; - cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); - } - } - - { // file info - // TODO: not overwrite fi? since same? - auto& file_info = o.emplace_or_replace(); - file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; - file_info.total_size = file_impl->_file_size; - - o.emplace_or_replace(std::vector{std::string{file_path_}}); - } - // hmmm - o.remove(); + // TODO: we need a replacement for this + o.remove(); // we dont want the info anymore o.remove(); @@ -188,32 +163,33 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v o.emplace(sha1_info); o.emplace(sha1_info_data); // keep around? or file? o.emplace(sha1_info_hash); - { // lookup tables and have - auto& cc = o.emplace(); - cc.have_all = true; - // skip have vec, since all - cc.have_count = sha1_info.chunks.size(); // need? + } - cc.chunk_hash_to_index.clear(); // for cpy pst - for (size_t i = 0; i < sha1_info.chunks.size(); i++) { - cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); - } - } + { // lookup tables and have + auto& cc = o.get_or_emplace(); + // skip have vec, since all + cc.have_count = sha1_info.chunks.size(); // need? - { // file info - auto& file_info = o.emplace(); - file_info.file_list.emplace_back() = {std::string{file_name_}, file_impl->_file_size}; - file_info.total_size = file_impl->_file_size; - - o.emplace(std::vector{std::string{file_path_}}); + cc.chunk_hash_to_index.clear(); // for cpy pst + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); } } - o.emplace_or_replace(std::move(file_impl)); + o.emplace_or_replace(); + o.remove(); - // TODO: replace with transfers stats - if (!o.all_of()) { - o.emplace(0u); + { // file info + // TODO: not overwrite fi? since same? + o.emplace_or_replace(file_name_, file_impl->_file_size); + o.emplace_or_replace(file_path_); + o.emplace_or_replace(file_path_); // ? + } + + o.emplace_or_replace(std::move(file_impl)); + + if (!o.all_of()) { + o.emplace(); } cb(o); @@ -226,23 +202,33 @@ void SHA1MappedFilesystem::newFromFile(std::string_view file_name, std::string_v } std::unique_ptr SHA1MappedFilesystem::file2(Object ov, FILE2_FLAGS flags) { + if (flags & FILE2_RAW) { + std::cerr << "SHA1MF error: does not support raw modes\n"; + return nullptr; + } + ObjectHandle o{_os.registry(), ov}; if (!static_cast(o)) { return nullptr; } - if (!o.all_of()) { + // will this do if we go and support enc? + // use ObjComp::Ephemeral::FilePath instead?? + if (!o.all_of()) { return nullptr; } - const auto& file_list = o.get().file_list; - if (file_list.empty()) { + const auto& file_path = o.get().file_path; + if (file_path.empty()) { return nullptr; } - auto res = construct_file2_rw_mapped(file_list.front(), -1); + // TODO: read-only one too + // since they are mapped, is this efficent to have multiple? + auto res = construct_file2_rw_mapped(file_path, -1); if (!res || !res->isGood()) { + std::cerr << "SHA1MF error: failed constructing mapped file '" << file_path << "'\n"; return nullptr; } diff --git a/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp b/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp index dc3fd56..d2db2df 100644 --- a/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp +++ b/solanaceae/ngc_ft1_sha1/backends/sha1_mapped_filesystem.hpp @@ -32,7 +32,7 @@ struct SHA1MappedFilesystem : public StorageBackendI { // might return pre-existing? ObjectHandle newFromInfoHash(ByteSpan info_hash); - std::unique_ptr file2(Object o, FILE2_FLAGS flags); // default does nothing + std::unique_ptr file2(Object o, FILE2_FLAGS flags) override; }; } // Backends diff --git a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp index 27f7c0b..d105195 100644 --- a/solanaceae/ngc_ft1_sha1/chunk_picker.cpp +++ b/solanaceae/ngc_ft1_sha1/chunk_picker.cpp @@ -1,10 +1,11 @@ #include "./chunk_picker.hpp" #include - -#include "./components.hpp" #include "./contact_components.hpp" +#include +#include "./components.hpp" + #include #include @@ -140,37 +141,39 @@ void ChunkPicker::updateParticipation( continue; } - if (o.all_of()) { + if (o.all_of()) { participating_unfinished.erase(o); continue; } - if (o.get().have_all) { + if (o.all_of()) { participating_unfinished.erase(o); + continue; } } else { if (!o.all_of()) { continue; } - if (o.all_of()) { + if (o.all_of()) { continue; } - if (!o.get().have_all) { - using Priority = Components::DownloadPriority::Priority; + if (!o.all_of()) { + //using Priority = Components::DownloadPriority::Priority; + using Priority = ObjComp::Ephemeral::File::DownloadPriority::Priority; Priority prio = Priority::NORMAL; - if (o.all_of()) { - prio = o.get().p; + if (o.all_of()) { + prio = o.get().p; } uint16_t pskips = - prio == Priority::HIGHER ? 0u : + prio == Priority::HIGHEST ? 0u : prio == Priority::HIGH ? 1u : prio == Priority::NORMAL ? 2u : prio == Priority::LOW ? 4u : - 8u + 8u // LOWEST ; participating_unfinished.emplace(o, ParticipationEntry{pskips}); @@ -255,18 +258,19 @@ std::vector ChunkPicker::updateChunkRequests( ObjectHandle o {objreg, it->first}; // intersect self have with other have - if (!o.all_of()) { + if (!o.all_of()) { // rare case where no one else has anything continue; } - const auto& cc = o.get(); - if (cc.have_all) { + if (o.all_of()) { std::cerr << "ChunkPicker error: completed content still in participating_unfinished!\n"; continue; } - const auto& others_have = o.get().others; + //const auto& cc = o.get(); + + const auto& others_have = o.get().others; auto other_it = others_have.find(c); if (other_it == others_have.end()) { // rare case where the other is participating but has nothing @@ -275,7 +279,14 @@ std::vector ChunkPicker::updateChunkRequests( const auto& other_have = other_it->second; - BitSet chunk_candidates = cc.have_chunk; + const auto& info = o.get(); + const auto total_chunks = info.chunks.size(); + + const auto* lhb = o.try_get(); + + // if we dont have anything, this might not exist yet + BitSet chunk_candidates = lhb == nullptr ? BitSet{total_chunks} : lhb->have; + if (!other_have.have_all) { // AND is the same as ~(~A | ~B) // that means we leave chunk_candidates as (have is inverted want) @@ -288,8 +299,6 @@ std::vector ChunkPicker::updateChunkRequests( } else { chunk_candidates.invert(); } - const auto& info = o.get(); - const auto total_chunks = info.chunks.size(); auto& requested_chunks = o.get_or_emplace().chunks; // TODO: trim off round up to 8, since they are now always set @@ -306,8 +315,8 @@ std::vector ChunkPicker::updateChunkRequests( // TODO: configurable size_t start_offset {0u}; - if (o.all_of()) { - const auto byte_offset = o.get().offset_into_file; + if (o.all_of()) { + const auto byte_offset = o.get().offset_into_file; if (byte_offset <= info.file_size) { start_offset = byte_offset/info.chunk_size; } else { diff --git a/solanaceae/ngc_ft1_sha1/components.cpp b/solanaceae/ngc_ft1_sha1/components.cpp index 26c6e82..a89f39a 100644 --- a/solanaceae/ngc_ft1_sha1/components.cpp +++ b/solanaceae/ngc_ft1_sha1/components.cpp @@ -1,5 +1,7 @@ #include "./components.hpp" +#include + namespace Components { std::vector FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const { @@ -11,14 +13,20 @@ std::vector FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) cons } } -bool FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { - if (have_all) { // short cut +bool FT1ChunkSHA1Cache::haveChunk(ObjectHandle o, const SHA1Digest& hash) const { + if (o.all_of()) { return true; } + const auto* lhb = o.try_get(); + if (lhb == nullptr) { + return false; // we dont have anything yet + } + if (auto i_vec = chunkIndices(hash); !i_vec.empty()) { // TODO: should i test all? - return have_chunk[i_vec.front()]; + //return have_chunk[i_vec.front()]; + return lhb->have[i_vec.front()]; } // not part of this file diff --git a/solanaceae/ngc_ft1_sha1/components.hpp b/solanaceae/ngc_ft1_sha1/components.hpp index 740a46c..bb93275 100644 --- a/solanaceae/ngc_ft1_sha1/components.hpp +++ b/solanaceae/ngc_ft1_sha1/components.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -35,18 +36,24 @@ namespace Components { }; struct FT1ChunkSHA1Cache { - // TODO: extract have_chunk, have_all and have_count to generic comp + // TODO: extract have_count to generic comp // have_chunk is the size of info.chunks.size(), or empty if have_all // keep in mind bitset rounds up to 8s - BitSet have_chunk{0}; + //BitSet have_chunk{0}; - bool have_all {false}; - size_t have_count {0}; + //bool have_all {false}; + size_t have_count {0}; // move? entt::dense_map> chunk_hash_to_index; std::vector chunkIndices(const SHA1Digest& hash) const; - bool haveChunk(const SHA1Digest& hash) const; + bool haveChunk(ObjectHandle o, const SHA1Digest& hash) const; + }; + + struct FT1File2 { + // the cached file2 for faster access + // should be destroyed when no activity and recreated on demand + std::unique_ptr file; }; struct FT1ChunkSHA1Requested { @@ -63,7 +70,7 @@ namespace Components { entt::dense_set participants; }; - struct RemoteHave { + struct RemoteHaveBitset { struct Entry { bool have_all {false}; BitSet have; @@ -92,41 +99,8 @@ namespace Components { void lower(void); }; - struct DownloadPriority { - // download/retreival priority in comparison to other objects - // not all backends implement this - // priority can be weak, meaning low priority dls will still get transfer activity, just less often - enum class Priority { - HIGHER, - HIGH, - NORMAL, - LOW, - LOWER, - } p = Priority::NORMAL; - }; - - struct ReadHeadHint { - // points to the first byte we want - // this is just a hint, that can be set from outside - // to guide the sequential "piece picker" strategy - // ? the strategy *should* set this to the first byte we dont yet have - uint64_t offset_into_file {0u}; - }; - - // this is per object/content - // more aplicable than "separated", so should be supported by most backends - struct TransferStats { - // in bytes per second - float rate_up {0.f}; - float rate_down {0.f}; - - // bytes - uint64_t total_up {0u}; - uint64_t total_down {0u}; - }; - struct TransferStatsSeparated { - entt::dense_map stats; + entt::dense_map stats; }; // used to populate stats diff --git a/solanaceae/ngc_ft1_sha1/re_announce_systems.cpp b/solanaceae/ngc_ft1_sha1/re_announce_systems.cpp index 124baa0..e853ad0 100644 --- a/solanaceae/ngc_ft1_sha1/re_announce_systems.cpp +++ b/solanaceae/ngc_ft1_sha1/re_announce_systems.cpp @@ -1,7 +1,7 @@ #include "./re_announce_systems.hpp" #include "./components.hpp" -#include +#include #include #include #include @@ -18,11 +18,12 @@ void re_announce( std::vector to_remove; os_reg.view().each([&os_reg, &cr, &neep, &to_remove, delta](Object ov, Components::ReAnnounceTimer& rat) { ObjectHandle o{os_reg, ov}; - // if paused -> remove - if (o.all_of()) { - to_remove.push_back(ov); - return; - } + // TODO: pause + //// if paused -> remove + //if (o.all_of()) { + // to_remove.push_back(ov); + // return; + //} // if not downloading or info incomplete -> remove if (!o.all_of()) { @@ -31,7 +32,7 @@ void re_announce( return; } - if (o.get().have_all) { + if (o.all_of()) { // transfer done, we stop announcing to_remove.push_back(ov); return; diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp index 21c8230..074fcdb 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include "./util.hpp" @@ -31,12 +32,6 @@ #include #include -namespace Message::Components { - - using Content = ObjectHandle; - -} // Message::Components - static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { if (chunk_index+1 == sha1_info.chunks.size()) { // last chunk @@ -70,30 +65,15 @@ void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_numbe _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, obj, hash, 0.f)); } -void SHA1_NGCFT1::updateMessages(ObjectHandle ce) { - assert(ce.all_of()); +void SHA1_NGCFT1::updateMessages(ObjectHandle o) { + assert(o.all_of()); - for (auto msg : ce.get().messages) { - if (ce.all_of() && !msg.all_of()) { - msg.emplace(ce.get()); - } - if (ce.all_of()) { - msg.emplace_or_replace(ce.get()); - } - if (ce.all_of()) { - msg.emplace_or_replace(ce.get()); - } - if (ce.all_of()) { - msg.emplace_or_replace(ce.get()); - } - if (ce.all_of()) { - msg.emplace_or_replace(); - } else { - msg.remove(); - } - if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { - msg.emplace_or_replace(); - } + for (auto msg : o.get().messages) { + msg.emplace_or_replace(o); + + // messages no long hold this info + // this should not update messages anymore but simply just update the object + // and receivers should listen for object updates (?) _rmm.throwEventUpdate(msg); } @@ -177,6 +157,40 @@ void SHA1_NGCFT1::queueBitsetSendFull(Contact3Handle c, ObjectHandle o) { _queue_send_bitset.push_back(QBitsetEntry{c, o}); } +File2I* SHA1_NGCFT1::objGetFile2Write(ObjectHandle o) { + auto* file2_comp_ptr = o.try_get(); + if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_write || !file2_comp_ptr->file->isGood()) { + // (re)request file2 from backend + auto new_file = _mfb.file2(o, StorageBackendI::FILE2_WRITE); + if (!new_file || !new_file->can_write || !new_file->isGood()) { + std::cerr << "SHA1_NGCFT1 error: failed to open object for writing\n"; + return nullptr; // early out + } + file2_comp_ptr = &o.emplace_or_replace(std::move(new_file)); + } + assert(file2_comp_ptr != nullptr); + assert(static_cast(file2_comp_ptr->file)); + + return file2_comp_ptr->file.get(); +} + +File2I* SHA1_NGCFT1::objGetFile2Read(ObjectHandle o) { + auto* file2_comp_ptr = o.try_get(); + if (file2_comp_ptr == nullptr || !file2_comp_ptr->file || !file2_comp_ptr->file->can_read || !file2_comp_ptr->file->isGood()) { + // (re)request file2 from backend + auto new_file = _mfb.file2(o, StorageBackendI::FILE2_READ); + if (!new_file || !new_file->can_read || !new_file->isGood()) { + std::cerr << "SHA1_NGCFT1 error: failed to open object for reading\n"; + return nullptr; // early out + } + file2_comp_ptr = &o.emplace_or_replace(std::move(new_file)); + } + assert(file2_comp_ptr != nullptr); + assert(static_cast(file2_comp_ptr->file)); + + return file2_comp_ptr->file.get(); +} + SHA1_NGCFT1::SHA1_NGCFT1( ObjectStore2& os, Contact3Registry& cr, @@ -196,9 +210,9 @@ SHA1_NGCFT1::SHA1_NGCFT1( _mfb(os) { // TODO: also create and destroy - _rmm.subscribe(this, RegistryMessageModel_Event::message_updated); - //_rmm.subscribe(this, RegistryMessageModel_Event::message_construct); - //_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); + //_os.subscribe(this, ObjectStore_Event::object_construct); + _os.subscribe(this, ObjectStore_Event::object_update); + //_os.subscribe(this, ObjectStore_Event::object_destroy); _nft.subscribe(this, NGCFT1_Event::recv_request); _nft.subscribe(this, NGCFT1_Event::recv_init); @@ -289,20 +303,19 @@ float SHA1_NGCFT1::iterate(float delta) { if (static_cast(qe.o) && static_cast(qe.c) && qe.c.all_of() && qe.o.all_of()) { const auto [group_number, peer_number] = qe.c.get(); const auto& info_hash = qe.o.get().hash; - const auto& cc = qe.o.get(); const auto& info = qe.o.get(); const auto total_chunks = info.chunks.size(); static constexpr size_t bits_per_packet {8u*512u}; - if (cc.have_all) { + if (qe.o.all_of()) { // send have all _neep.send_ft1_have_all( group_number, peer_number, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), info_hash.data(), info_hash.size() ); - } else { + } else if (const auto* lhb = qe.o.try_get(); lhb != nullptr) { for (size_t i = 0; i < total_chunks; i += bits_per_packet) { size_t bits_this_packet = std::min(bits_per_packet, total_chunks-i); @@ -310,7 +323,7 @@ float SHA1_NGCFT1::iterate(float delta) { // TODO: optimize selective copy bitset for (size_t j = i; j < i+bits_this_packet; j++) { - if (cc.have_chunk[j]) { + if (lhb->have[j]) { have.set(j-i); } } @@ -324,7 +337,7 @@ float SHA1_NGCFT1::iterate(float delta) { have._bytes.data(), have.size_bytes() ); } - } + } // else, we have nothing *shrug* } _queue_send_bitset.pop_front(); @@ -466,6 +479,9 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p } } + // in both cases, private and public, c (contact to) is the target + o.get_or_emplace().targets.emplace(c); + // create message const auto c_self = _cr.get(c).self; if (!_cr.valid(c_self)) { @@ -479,8 +495,9 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p reg_ptr->emplace(msg_e, ts); // reactive? reg_ptr->emplace(msg_e, ts); - reg_ptr->emplace(msg_e); - reg_ptr->emplace(msg_e); + reg_ptr->emplace(msg_e, o); + + //reg_ptr->emplace(msg_e); o.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); @@ -510,29 +527,27 @@ void SHA1_NGCFT1::onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_p _rmm.throwEventConstruct(*reg_ptr, msg_e); // TODO: place in iterate? - updateMessages(o); + updateMessages(o); // nop // TODO: remove } -bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { - // see tox_transfer_manager.cpp for reference - if (!e.e.all_of()) { +bool SHA1_NGCFT1::onEvent(const ObjectStore::Events::ObjectUpdate& e) { + if (!e.e.all_of()) { return false; } - //accept(e.e, e.e.get().save_to_path); - auto ce = e.e.get(); - - //if (!ce.all_of()) { - if (!ce.all_of()) { + if (!e.e.all_of()) { // not ready to load yet, skip return false; } - assert(!ce.all_of()); - assert(!ce.all_of()); + assert(!e.e.all_of()); + assert(!e.e.all_of()); + assert(!e.e.all_of()); + //accept(e.e, e.e.get().save_to_path); // first, open file for write(+readback) - std::string full_file_path{e.e.get().save_to_path}; + std::string full_file_path{e.e.get().save_to_path}; // TODO: replace with filesystem or something + // TODO: use bool in action !!! if (full_file_path.back() != '/') { full_file_path += "/"; } @@ -540,10 +555,10 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { // ensure dir exists std::filesystem::create_directories(full_file_path); - const auto& info = ce.get(); + const auto& info = e.e.get(); full_file_path += info.file_name; - ce.emplace(std::vector{full_file_path}); + e.e.emplace(full_file_path); const bool file_exists = std::filesystem::exists(full_file_path); std::unique_ptr file_impl = construct_file2_rw_mapped(full_file_path, info.file_size); @@ -551,15 +566,17 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { if (!file_impl->isGood()) { std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n"; // we failed opening that filepath, so we should offer the user the oportunity to save it differently - e.e.remove(); // stop + e.e.remove(); // stop return false; } { // next, create chuck cache and check for existing data - auto& cc = ce.emplace(); - auto& bytes_received = ce.get_or_emplace().total; - cc.have_chunk = BitSet(info.chunks.size()); - cc.have_all = false; + auto& transfer_stats = e.e.get_or_emplace(); + auto& lhb = e.e.get_or_emplace(); + if (lhb.have.size_bytes() < info.chunks.size()/8) { + lhb.have = BitSet{info.chunks.size()}; + } + auto& cc = e.e.emplace(); cc.have_count = 0; cc.chunk_hash_to_index.clear(); // if copy pasta @@ -576,9 +593,12 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { const bool data_equal = data_hash == info.chunks.at(i); if (data_equal) { - cc.have_chunk.set(i); + lhb.have.set(i); cc.have_count += 1; - bytes_received += chunk_size; + + // TODO: replace with some progress counter? + // or move have_count/want_count or something? + transfer_stats.total_down += chunk_size; //std::cout << "existing i[" << info.chunks.at(i) << "] == d[" << data_hash << "]\n"; } else { //std::cout << "unk i[" << info.chunks.at(i) << "] != d[" << data_hash << "]\n"; @@ -587,59 +607,45 @@ bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { // error reading? } - _chunks[info.chunks[i]] = ce; + _chunks[info.chunks[i]] = e.e; cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } std::cout << "preexisting " << cc.have_count << "/" << info.chunks.size() << "\n"; if (cc.have_count >= info.chunks.size()) { - cc.have_all = true; - //ce.remove(); + e.e.emplace_or_replace(); + e.e.remove(); } } else { for (size_t i = 0; i < info.chunks.size(); i++) { - _chunks[info.chunks[i]] = ce; + _chunks[info.chunks[i]] = e.e; cc.chunk_hash_to_index[info.chunks[i]].push_back(i); } } } - ce.emplace(std::move(file_impl)); + e.e.emplace_or_replace(std::move(file_impl)); - // queue announce we are participating - // since this is the first time, we publicly announce to all - if (e.e.all_of()) { - const auto c_f = e.e.get().c; - const auto c_t = e.e.get().c; + // queue announce that we are participating + e.e.get_or_emplace(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f; - if (_cr.all_of(c_t)) { - // public - ce.get_or_emplace().targets.emplace(c_t); - } else if (_cr.all_of(c_f)) { - // private ? - ce.get_or_emplace().targets.emplace(c_f); - } - } - ce.get_or_emplace(0.1f, 60.f*(_rng()%5120) / 1024.f).timer = (_rng()%512) / 1024.f; - - ce.remove(); + e.e.remove(); // start requesting from all participants - if (ce.all_of()) { - std::cout << "accepted ft has " << ce.get().participants.size() << " sp\n"; - for (const auto cv : ce.get().participants) { + if (e.e.all_of()) { + std::cout << "accepted ft has " << e.e.get().participants.size() << " sp\n"; + for (const auto cv : e.e.get().participants) { _cr.emplace_or_replace(cv); } } else { std::cout << "accepted ft has NO sp!\n"; } - // should? - e.e.remove(); + e.e.remove(); - updateMessages(ce); + updateMessages(e.e); - return false; + return false; // ? } bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { @@ -717,7 +723,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { assert(o.all_of()); - if (!o.get().haveChunk(chunk_hash)) { + if (!o.get().haveChunk(o, chunk_hash)) { // we dont have the chunk return false; } @@ -793,7 +799,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { assert(o.all_of()); const auto& cc = o.get(); - if (cc.haveChunk(sha1_chunk_hash)) { + if (cc.haveChunk(o, sha1_chunk_hash)) { std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n"; // we have the chunk return false; @@ -854,16 +860,16 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { } else if (transfer.isChunk()) { auto o = transfer.getChunk().content; - assert(o.all_of()); - auto* file = o.get().get(); - assert(file != nullptr); - const auto chunk_size = o.get().chunk_size; for (const auto chunk_index : transfer.getChunk().chunk_indices) { const auto offset_into_file = chunk_index * chunk_size; - if (!file->write({e.data, e.data_size}, offset_into_file + e.data_offset)) { - std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << offset_into_file + e.data_offset << "\n"; + auto* file2 = objGetFile2Write(o); + if (file2 == nullptr) { + return false; // early out + } + if (!file2->write({e.data, e.data_size}, offset_into_file + e.data_offset)) { + std::cerr << "SHA1_NGCFT1 error: writing file failed o:" << entt::to_integral(o.entity()) << "@" << offset_into_file + e.data_offset << "\n"; } } @@ -903,8 +909,14 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { } else if (transfer.isChunk()) { auto& chunk_transfer = transfer.getChunk(); const auto& info = chunk_transfer.content.get(); - // TODO: should we really use file? - const auto data = chunk_transfer.content.get()->read( + + auto* file2 = objGetFile2Read(chunk_transfer.content); + if (file2 == nullptr) { + // return true? + return false; // early out + } + + const auto data = file2->read( e.data_size, (chunk_transfer.chunk_index * uint64_t(info.chunk_size)) + e.data_offset ); @@ -914,7 +926,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { e.data[i] = data[i]; } - chunk_transfer.content.get_or_emplace().total += data.size; // TODO: add event to propergate to messages //_rmm.throwEventUpdate(transfer); // should we? @@ -972,9 +983,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { { // file info // TODO: not overwrite fi? since same? - auto& file_info = o.emplace_or_replace(); - file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; - file_info.total_size = ft_info.file_size; + auto& file_info = o.emplace_or_replace(ft_info.file_name, ft_info.file_size); + //auto& file_info = o.emplace_or_replace(); + //file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; + //file_info.total_size = ft_info.file_size; } std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n"; @@ -984,7 +996,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { _queue_content_want_info.erase(it); } - o.emplace_or_replace(); + o.emplace_or_replace(); updateMessages(o); } else if (transfer.isChunk()) { @@ -1000,7 +1012,12 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { const auto chunk_size = info.chunkSize(chunk_index); assert(offset_into_file+chunk_size <= info.file_size); - const auto chunk_data = o.get()->read(chunk_size, offset_into_file); + auto* file2 = objGetFile2Read(o); + if (file2 == nullptr) { + // rip + return false; + } + auto chunk_data = std::move(file2->read(chunk_size, offset_into_file)); assert(!chunk_data.empty()); // check hash of chunk @@ -1008,35 +1025,43 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { if (info.chunks.at(chunk_index) == got_hash) { std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; - if (!cc.have_all) { - for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) { - if (!cc.have_all && !cc.have_chunk[inner_chunk_index]) { - cc.have_chunk.set(inner_chunk_index); + if (!o.all_of()) { + { + auto& lhb = o.get_or_emplace(info.chunks.size()); + for (const auto inner_chunk_index : transfer.getChunk().chunk_indices) { + if (lhb.have[inner_chunk_index]) { + continue; + } + + // new good chunk + + lhb.have.set(inner_chunk_index); cc.have_count += 1; + + // TODO: have wasted + metadata + //o.get_or_emplace().total += chunk_data.size; + // we already tallied all of them but maybe we want to set some other progress indicator here? + if (cc.have_count == info.chunks.size()) { // debug check for ([[maybe_unused]] size_t i = 0; i < info.chunks.size(); i++) { - assert(cc.have_chunk[i]); + assert(lhb.have[i]); } - cc.have_all = true; - cc.have_chunk = BitSet(0); // not wasting memory + o.emplace_or_replace(); std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; - // HACK: remap file, to clear ram - - // TODO: error checking - o.get() = construct_file2_rw_mapped( - o.get().file_list.front(), - info.file_size - ); + // HACK: close file2, to clear ram + // TODO: just add a lastActivity comp and close files every x minutes based on that + file2 = nullptr; // making sure we dont have a stale ptr + o.remove(); // will be recreated on demand + break; } - - // good chunk - // TODO: have wasted + metadata - o.get_or_emplace().total += chunk_data.size; } } + if (o.all_of()) { + o.remove(); // save space + } // queue chunk have for all participants // HACK: send immediatly to all participants @@ -1062,20 +1087,6 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { chunk_indices.data(), chunk_indices.size() ); } - -#if 0 - if (!cc.have_all) { // debug print self have set - std::cout << "DEBUG print have bitset: s:" << cc.have_chunk.size_bits(); - for (size_t i = 0; i < cc.have_chunk.size_bytes(); i++) { - if (i % 32 == 0) { - printf("\n"); - } - // f cout - printf("%.2x", (uint16_t)cc.have_chunk.data()[i]); - } - printf("\n"); - } -#endif } else { std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; } @@ -1125,7 +1136,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { return false; } - uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + uint64_t ts = Message::getTimeMS(); const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); _tox_peer_to_contact[combine_ids(e.group_number, e.peer_number)] = c; // workaround @@ -1152,7 +1163,7 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { reg.emplace(new_msg_e, e.message_id); - reg.emplace(new_msg_e); // add sending? + //reg.emplace(new_msg_e); // add sending? reg.emplace(new_msg_e, ts); //reg.emplace(new_msg_e, 0); @@ -1174,58 +1185,39 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { // check if content exists const auto sha1_info_hash = std::vector{e.file_id, e.file_id+e.file_id_size}; - ObjectHandle ce; + ObjectHandle o; if (_info_to_content.count(sha1_info_hash)) { - ce = _info_to_content.at(sha1_info_hash); + o = _info_to_content.at(sha1_info_hash); std::cout << "SHA1_NGCFT1: new message has existing content\n"; } else { // TODO: backend - ce = {_os.registry(), _os.registry().create()}; - _info_to_content[sha1_info_hash] = ce; + o = _mfb.newObject(ByteSpan{sha1_info_hash}); + _info_to_content[sha1_info_hash] = o; + o.emplace(sha1_info_hash); std::cout << "SHA1_NGCFT1: new message has new content\n"; - - //ce.emplace(sha1_info); - //ce.emplace(sha1_info_data); // keep around? or file? - ce.emplace(sha1_info_hash); } - ce.get_or_emplace().messages.push_back({reg, new_msg_e}); - reg_ptr->emplace(new_msg_e, ce); + o.get_or_emplace().messages.push_back({reg, new_msg_e}); + reg_ptr->emplace(new_msg_e, o); // HACK: assume the message sender is participating. usually a safe bet. - if (addParticipation(c, ce)) { + if (addParticipation(c, o)) { // something happend, update chunk picker assert(static_cast(c)); c.emplace_or_replace(); } // HACK: assume the message sender has all - ce.get_or_emplace().others[c] = {true, {}}; + o.get_or_emplace().others[c] = {true, {}}; - if (!ce.all_of() && !ce.all_of()) { + if (!o.all_of() && !o.all_of()) { // TODO: check if already receiving - _queue_content_want_info.push_back(ce); + _queue_content_want_info.push_back(o); } + // since public + o.get_or_emplace().targets.emplace(c.get().parent); + // TODO: queue info dl - - //reg_ptr->emplace(e, sha1_info); - //reg_ptr->emplace(e, sha1_info_data); // keep around? or file? - //reg.emplace(new_msg_e, std::vector{e.file_id, e.file_id+e.file_id_size}); - - if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { - reg_ptr->emplace(new_msg_e); - } - - if (ce.all_of()) { - reg_ptr->emplace(new_msg_e, ce.get()); - } - if (ce.all_of()) { - reg_ptr->emplace(new_msg_e, ce.get()); - } - if (ce.all_of()) { - reg_ptr->emplace(new_msg_e, ce.get()); - } - // TODO: queue info/check if we already have info _rmm.throwEventConstruct(reg, new_msg_e); @@ -1249,7 +1241,7 @@ bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std } // get current time unix epoch utc - uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + uint64_t ts = Message::getTimeMS(); _mfb.newFromFile( file_name, file_path, @@ -1301,8 +1293,8 @@ bool SHA1_NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { for (const auto& [_, o] : _info_to_content) { removeParticipation(c, o); - if (o.all_of()) { - o.get().others.erase(c); + if (o.all_of()) { + o.get().others.erase(c); } } } @@ -1356,10 +1348,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have& e) { c.emplace_or_replace(); } - auto& remote_have = o.get_or_emplace().others; + auto& remote_have = o.get_or_emplace().others; if (!remote_have.contains(c)) { // init - remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks}); + remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks}); // new have? nice // (always update on biset, not always on have) @@ -1443,10 +1435,10 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_bitset& e) { // we might not know yet addParticipation(c, o); - auto& remote_have = o.get_or_emplace().others; + auto& remote_have = o.get_or_emplace().others; if (!remote_have.contains(c)) { // init - remote_have.emplace(c, Components::RemoteHave::Entry{false, num_total_chunks}); + remote_have.emplace(c, Components::RemoteHaveBitset::Entry{false, num_total_chunks}); } auto& remote_have_peer = remote_have.at(c); @@ -1507,8 +1499,8 @@ bool SHA1_NGCFT1::onEvent(const Events::NGCEXT_ft1_have_all& e) { // we might not know yet addParticipation(c, o); - auto& remote_have = o.get_or_emplace().others; - remote_have[c] = Components::RemoteHave::Entry{true, {}}; + auto& remote_have = o.get_or_emplace().others; + remote_have[c] = Components::RemoteHaveBitset::Entry{true, {}}; // new have? nice // (always update on have_all, not always on have) diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp index 99d9bf1..2fc4a1b 100644 --- a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -20,7 +20,7 @@ #include #include -class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public NGCFT1EventI, public NGCEXTEventI { +class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public ObjectStoreEventI, public NGCFT1EventI, public NGCEXTEventI { ObjectStore2& _os; // TODO: backend abstraction Contact3Registry& _cr; @@ -75,6 +75,9 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public void queueBitsetSendFull(Contact3Handle c, ObjectHandle o); + File2I* objGetFile2Write(ObjectHandle o); + File2I* objGetFile2Read(ObjectHandle o); + public: // TODO: config bool _udp_only {false}; @@ -97,7 +100,10 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public void onSendFileHashFinished(ObjectHandle o, Message3Registry* reg_ptr, Contact3 c, uint64_t ts); protected: // rmm events (actions) - bool onEvent(const Message::Events::MessageUpdated&) override; + bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override; + + protected: // os events (actions) + bool onEvent(const ObjectStore::Events::ObjectUpdate&) override; protected: // events bool onEvent(const Events::NGCFT1_recv_request&) override; @@ -108,8 +114,6 @@ class SHA1_NGCFT1 : public ToxEventI, public RegistryMessageModelEventI, public bool onEvent(const Events::NGCFT1_send_done&) override; bool onEvent(const Events::NGCFT1_recv_message&) override; - bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override; - bool onToxEvent(const Tox_Event_Group_Peer_Join* e) override; bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override; diff --git a/solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp b/solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp index da01847..7b2c8bb 100644 --- a/solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp +++ b/solanaceae/ngc_ft1_sha1/transfer_stats_systems.cpp @@ -1,6 +1,8 @@ #include "./transfer_stats_systems.hpp" #include "./components.hpp" +#include + #include namespace Systems { @@ -88,7 +90,7 @@ void transfer_tally_update(ObjectRegistry& os_reg, const float time_now) { // for each stats separated -> stats (total) os_reg.view().each([&os_reg](const auto ov, Components::TransferStatsSeparated& tss_comp, const auto&) { - Components::TransferStats& stats = os_reg.get_or_emplace(ov); + auto& stats = os_reg.get_or_emplace(ov); stats = {}; // reset for (const auto& [_, peer_stats] : tss_comp.stats) {