os backend refactor
This commit is contained in:
parent
4fc229a531
commit
50db703d78
@ -42,7 +42,7 @@ SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api)
|
||||
// static store, could be anywhere tho
|
||||
// construct with fetched dependencies
|
||||
g_fsb = std::make_unique<Backends::FilesystemStorage>(*os, "test2_message_store/"); // TODO: use config?
|
||||
g_mfs = std::make_unique<MessageFragmentStore>(*cr, *rmm, *os, *g_fsb, *msnj);
|
||||
g_mfs = std::make_unique<MessageFragmentStore>(*cr, *rmm, *os, *g_fsb, *g_fsb, *msnj);
|
||||
|
||||
// register types
|
||||
PLUG_PROVIDE_INSTANCE(MessageFragmentStore, plugin_name, g_mfs.get());
|
||||
|
@ -40,9 +40,9 @@ int main(int argc, const char** argv) {
|
||||
// they only exist for the serializers (for now)
|
||||
// TODO: version
|
||||
MessageSerializerNJ msnj_src{cr, os_src, {}, {}};
|
||||
MessageFragmentStore mfs_src(cr, rmm, os_src, fsb_src, msnj_src);
|
||||
MessageFragmentStore mfs_src(cr, rmm, os_src, fsb_src, fsb_src, msnj_src);
|
||||
MessageSerializerNJ msnj_dst{cr, os_dst, {}, {}};
|
||||
MessageFragmentStore mfs_dst(cr, rmm, os_dst, fsb_dst, msnj_dst);
|
||||
MessageFragmentStore mfs_dst(cr, rmm, os_dst, fsb_dst, fsb_dst, msnj_dst);
|
||||
|
||||
// add message fragment store too (adds meta?)
|
||||
|
||||
@ -78,7 +78,7 @@ int main(int argc, const char** argv) {
|
||||
// technically we could just copy the file, but meh
|
||||
// read src and write dst data
|
||||
std::vector<uint8_t> tmp_buffer;
|
||||
std::function<StorageBackendI::read_from_storage_put_data_cb> cb = [&tmp_buffer](const ByteSpan buffer) {
|
||||
std::function<StorageBackendIAtomic::read_from_storage_put_data_cb> cb = [&tmp_buffer](const ByteSpan buffer) {
|
||||
tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend());
|
||||
};
|
||||
if (!_fsb_src.read(e.e, cb)) {
|
||||
@ -140,7 +140,7 @@ int main(int argc, const char** argv) {
|
||||
}
|
||||
}
|
||||
|
||||
static_cast<StorageBackendI&>(_fsb_dst).write(oh, ByteSpan{tmp_buffer});
|
||||
static_cast<StorageBackendIAtomic&>(_fsb_dst).write(oh, ByteSpan{tmp_buffer});
|
||||
|
||||
//assert(std::filesystem::file_size(e.e.get<ObjComp::Ephemeral::FilePath>().path) == std::filesystem::file_size(oh.get<ObjComp::Ephemeral::FilePath>().path));
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include "./message_fragment_store.hpp"
|
||||
|
||||
#include "./internal_mfs_contexts.hpp"
|
||||
#include "solanaceae/object_store/meta_components.hpp"
|
||||
#include "solanaceae/object_store/object_store.hpp"
|
||||
|
||||
#include <solanaceae/object_store/serializer_json.hpp>
|
||||
|
||||
@ -41,12 +43,12 @@ namespace ObjectStore::Components {
|
||||
} // ObjectStore::Component
|
||||
|
||||
static nlohmann::json loadFromStorageNJ(ObjectHandle oh) {
|
||||
assert(oh.all_of<ObjComp::Ephemeral::Backend>());
|
||||
auto* backend = oh.get<ObjComp::Ephemeral::Backend>().ptr;
|
||||
assert(oh.all_of<ObjComp::Ephemeral::BackendAtomic>());
|
||||
auto* backend = oh.get<ObjComp::Ephemeral::BackendAtomic>().ptr;
|
||||
assert(backend != nullptr);
|
||||
|
||||
std::vector<uint8_t> tmp_buffer;
|
||||
std::function<StorageBackendI::read_from_storage_put_data_cb> cb = [&tmp_buffer](const ByteSpan buffer) {
|
||||
std::function<StorageBackendIAtomic::read_from_storage_put_data_cb> cb = [&tmp_buffer](const ByteSpan buffer) {
|
||||
tmp_buffer.insert(tmp_buffer.end(), buffer.cbegin(), buffer.cend());
|
||||
};
|
||||
if (!backend->read(oh, cb)) {
|
||||
@ -184,7 +186,9 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
|
||||
if (!_os.registry().valid(fragment_id)) {
|
||||
const auto new_uuid = _session_uuid_gen();
|
||||
_fs_ignore_event = true;
|
||||
auto fh = _sb.newObject(ByteSpan{new_uuid});
|
||||
auto fh = _sbm.newObject(ByteSpan{new_uuid});
|
||||
// TODO: the backend should have done that?
|
||||
fh.emplace_or_replace<ObjComp::Ephemeral::BackendAtomic>(&_sba);
|
||||
_fs_ignore_event = false;
|
||||
if (!static_cast<bool>(fh)) {
|
||||
std::cout << "MFS error: failed to create new object for message\n";
|
||||
@ -482,8 +486,8 @@ bool MessageFragmentStore::syncFragToStorage(ObjectHandle fh, Message3Registry&
|
||||
std::cerr << "MFS error: unknown object version\n";
|
||||
assert(false);
|
||||
}
|
||||
assert(fh.all_of<ObjComp::Ephemeral::Backend>());
|
||||
auto* backend = fh.get<ObjComp::Ephemeral::Backend>().ptr;
|
||||
assert(fh.all_of<ObjComp::Ephemeral::BackendAtomic>());
|
||||
auto* backend = fh.get<ObjComp::Ephemeral::BackendAtomic>().ptr;
|
||||
if (backend->write(fh, {reinterpret_cast<const uint8_t*>(data_to_save.data()), data_to_save.size()})) {
|
||||
// TODO: make this better, should this be called on fail? should this be called before sync? (prob not)
|
||||
_fs_ignore_event = true;
|
||||
@ -503,9 +507,10 @@ MessageFragmentStore::MessageFragmentStore(
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ObjectStore2& os,
|
||||
StorageBackendI& sb,
|
||||
StorageBackendIMeta& sbm,
|
||||
StorageBackendIAtomic& sba,
|
||||
MessageSerializerNJ& scnj
|
||||
) : _cr(cr), _rmm(rmm), _rmm_sr(_rmm.newSubRef(this)), _os(os), _os_sr(_os.newSubRef(this)), _sb(sb), _scnj(scnj) {
|
||||
) : _cr(cr), _rmm(rmm), _rmm_sr(_rmm.newSubRef(this)), _os(os), _os_sr(_os.newSubRef(this)), _sbm(sbm), _sba(sba), _scnj(scnj) {
|
||||
_rmm_sr
|
||||
.subscribe(RegistryMessageModel_Event::message_construct)
|
||||
.subscribe(RegistryMessageModel_Event::message_updated)
|
||||
|
@ -55,7 +55,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public ObjectSto
|
||||
RegistryMessageModelI::SubscriptionReference _rmm_sr;
|
||||
ObjectStore2& _os;
|
||||
ObjectStore2::SubscriptionReference _os_sr;
|
||||
StorageBackendI& _sb;
|
||||
StorageBackendIMeta& _sbm;
|
||||
StorageBackendIAtomic& _sba;
|
||||
MessageSerializerNJ& _scnj;
|
||||
|
||||
bool _fs_ignore_event {false};
|
||||
@ -94,7 +95,8 @@ class MessageFragmentStore : public RegistryMessageModelEventI, public ObjectSto
|
||||
Contact3Registry& cr,
|
||||
RegistryMessageModelI& rmm,
|
||||
ObjectStore2& os,
|
||||
StorageBackendI& sb,
|
||||
StorageBackendIMeta& sbm,
|
||||
StorageBackendIAtomic& sba,
|
||||
MessageSerializerNJ& scnj
|
||||
);
|
||||
virtual ~MessageFragmentStore(void);
|
||||
|
Loading…
x
Reference in New Issue
Block a user