fixes missing comps and conversion tool is almost working (most of meta is transfered)
This commit is contained in:
parent
5c3b797a99
commit
854ed851b4
@ -40,6 +40,10 @@ static ByteSpan spanFromRead(const std::variant<ByteSpan, std::vector<uint8_t>>&
|
|||||||
|
|
||||||
// TODO: move somewhere else
|
// TODO: move somewhere else
|
||||||
static bool serl_json_data_enc_type(const ObjectHandle oh, nlohmann::json& out) {
|
static bool serl_json_data_enc_type(const ObjectHandle oh, nlohmann::json& out) {
|
||||||
|
if (!oh.all_of<ObjComp::DataEncryptionType>()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
out = static_cast<std::underlying_type_t<Encryption>>(
|
out = static_cast<std::underlying_type_t<Encryption>>(
|
||||||
oh.get<ObjComp::DataEncryptionType>().enc
|
oh.get<ObjComp::DataEncryptionType>().enc
|
||||||
);
|
);
|
||||||
@ -56,6 +60,10 @@ static bool deserl_json_data_enc_type(ObjectHandle oh, const nlohmann::json& in)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static bool serl_json_data_comp_type(const ObjectHandle oh, nlohmann::json& out) {
|
static bool serl_json_data_comp_type(const ObjectHandle oh, nlohmann::json& out) {
|
||||||
|
if (!oh.all_of<ObjComp::DataCompressionType>()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
out = static_cast<std::underlying_type_t<Compression>>(
|
out = static_cast<std::underlying_type_t<Compression>>(
|
||||||
oh.get<ObjComp::DataCompressionType>().comp
|
oh.get<ObjComp::DataCompressionType>().comp
|
||||||
);
|
);
|
||||||
@ -524,6 +532,9 @@ size_t FilesystemStorage::scanPath(std::string_view path) {
|
|||||||
// main thread
|
// main thread
|
||||||
// TODO: check timestamps of preexisting and reload? mark external/remote dirty?
|
// TODO: check timestamps of preexisting and reload? mark external/remote dirty?
|
||||||
for (const auto& it : file_obj_list) {
|
for (const auto& it : file_obj_list) {
|
||||||
|
MetaFileType mft {MetaFileType::TEXT_JSON};
|
||||||
|
Encryption meta_enc {Encryption::NONE};
|
||||||
|
Compression meta_comp {Compression::NONE};
|
||||||
nlohmann::json j;
|
nlohmann::json j;
|
||||||
if (it.meta_ext == ".meta.msgpack") {
|
if (it.meta_ext == ".meta.msgpack") {
|
||||||
std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary);
|
std::ifstream file(it.obj_path.generic_u8string() + it.meta_ext, std::ios::in | std::ios::binary);
|
||||||
@ -532,6 +543,8 @@ size_t FilesystemStorage::scanPath(std::string_view path) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mft = MetaFileType::BINARY_MSGPACK;
|
||||||
|
|
||||||
// file is a msgpack within a msgpack
|
// file is a msgpack within a msgpack
|
||||||
|
|
||||||
std::vector<uint8_t> full_meta_data;
|
std::vector<uint8_t> full_meta_data;
|
||||||
@ -558,13 +571,13 @@ size_t FilesystemStorage::scanPath(std::string_view path) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
Encryption meta_enc = meta_header_j.at(1);
|
meta_enc = meta_header_j.at(1);
|
||||||
if (meta_enc != Encryption::NONE) {
|
if (meta_enc != Encryption::NONE) {
|
||||||
std::cerr << "FS error: unknown encryption " << it.obj_path << "\n";
|
std::cerr << "FS error: unknown encryption " << it.obj_path << "\n";
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
Compression meta_comp = meta_header_j.at(2);
|
meta_comp = meta_header_j.at(2);
|
||||||
if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) {
|
if (meta_comp != Compression::NONE && meta_comp != Compression::ZSTD) {
|
||||||
std::cerr << "FS error: unknown compression " << it.obj_path << "\n";
|
std::cerr << "FS error: unknown compression " << it.obj_path << "\n";
|
||||||
continue;
|
continue;
|
||||||
@ -598,6 +611,8 @@ size_t FilesystemStorage::scanPath(std::string_view path) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mft = MetaFileType::TEXT_JSON;
|
||||||
|
|
||||||
file >> j;
|
file >> j;
|
||||||
} else {
|
} else {
|
||||||
assert(false);
|
assert(false);
|
||||||
@ -613,6 +628,9 @@ size_t FilesystemStorage::scanPath(std::string_view path) {
|
|||||||
ObjectHandle oh{_os.registry(), _os.registry().create()};
|
ObjectHandle oh{_os.registry(), _os.registry().create()};
|
||||||
oh.emplace<ObjComp::Ephemeral::Backend>(this);
|
oh.emplace<ObjComp::Ephemeral::Backend>(this);
|
||||||
oh.emplace<ObjComp::ID>(hex2bin(it.id_str));
|
oh.emplace<ObjComp::ID>(hex2bin(it.id_str));
|
||||||
|
oh.emplace<ObjComp::Ephemeral::MetaFileType>(mft);
|
||||||
|
oh.emplace<ObjComp::Ephemeral::MetaEncryptionType>(meta_enc);
|
||||||
|
oh.emplace<ObjComp::Ephemeral::MetaCompressionType>(meta_comp);
|
||||||
|
|
||||||
oh.emplace<ObjComp::Ephemeral::FilePath>(it.obj_path.generic_u8string());
|
oh.emplace<ObjComp::Ephemeral::FilePath>(it.obj_path.generic_u8string());
|
||||||
|
|
||||||
|
@ -27,7 +27,7 @@ struct FilesystemStorage : public StorageBackendI {
|
|||||||
size_t scanPath(std::string_view path);
|
size_t scanPath(std::string_view path);
|
||||||
void scanPathAsync(std::string path);
|
void scanPathAsync(std::string path);
|
||||||
|
|
||||||
private:
|
public: // TODO: private?
|
||||||
// this thing needs to change and be facilitated over the OS
|
// this thing needs to change and be facilitated over the OS
|
||||||
// but the json serializer are specific to the backend
|
// but the json serializer are specific to the backend
|
||||||
SerializerCallbacks<Object> _sc;
|
SerializerCallbacks<Object> _sc;
|
||||||
|
@ -1,9 +1,14 @@
|
|||||||
#include "./object_store.hpp"
|
#include "./object_store.hpp"
|
||||||
#include "./backends/filesystem_storage.hpp"
|
#include "./backends/filesystem_storage.hpp"
|
||||||
|
#include "./meta_components.hpp"
|
||||||
|
|
||||||
|
#include <nlohmann/json.hpp>
|
||||||
|
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
|
||||||
int main(int argc, const char** argv) {
|
int main(int argc, const char** argv) {
|
||||||
if (argc != 3) {
|
if (argc != 3) {
|
||||||
std::cerr << "wrong paramter count, do " << argv[0] << " <input_folder> <output_folder>\n";
|
std::cerr << "wrong paramter count, do " << argv[0] << " <input_folder> <output_folder>\n";
|
||||||
@ -26,6 +31,77 @@ int main(int argc, const char** argv) {
|
|||||||
// add message fragment store too (adds meta?)
|
// add message fragment store too (adds meta?)
|
||||||
|
|
||||||
// hookup events
|
// hookup events
|
||||||
|
struct EventListener : public ObjectStoreEventI {
|
||||||
|
ObjectStore2& _os_src;
|
||||||
|
backend::FilesystemStorage& _fsb_src;
|
||||||
|
|
||||||
|
ObjectStore2& _os_dst;
|
||||||
|
backend::FilesystemStorage& _fsb_dst;
|
||||||
|
|
||||||
|
EventListener(
|
||||||
|
ObjectStore2& os_src,
|
||||||
|
backend::FilesystemStorage& fsb_src,
|
||||||
|
ObjectStore2& os_dst,
|
||||||
|
backend::FilesystemStorage& fsb_dst
|
||||||
|
) :
|
||||||
|
_os_src(os_src),
|
||||||
|
_fsb_src(fsb_src),
|
||||||
|
_os_dst(os_dst),
|
||||||
|
_fsb_dst(fsb_dst)
|
||||||
|
{
|
||||||
|
_os_src.subscribe(this, ObjectStore_Event::object_construct);
|
||||||
|
_os_src.subscribe(this, ObjectStore_Event::object_update);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected: // os
|
||||||
|
bool onEvent(const ObjectStore::Events::ObjectConstruct& e) override {
|
||||||
|
assert(e.e.all_of<ObjComp::Ephemeral::MetaFileType>());
|
||||||
|
assert(e.e.all_of<ObjComp::ID>());
|
||||||
|
auto oh = _fsb_dst.newObject(e.e.get<ObjComp::Ephemeral::MetaFileType>().type, ByteSpan{e.e.get<ObjComp::ID>().v});
|
||||||
|
|
||||||
|
if (!static_cast<bool>(oh)) {
|
||||||
|
// already exists
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // sync meta
|
||||||
|
// some hardcoded ehpemeral (besides mft/id)
|
||||||
|
oh.emplace_or_replace<ObjComp::Ephemeral::MetaEncryptionType>(e.e.get_or_emplace<ObjComp::Ephemeral::MetaEncryptionType>());
|
||||||
|
oh.emplace_or_replace<ObjComp::Ephemeral::MetaCompressionType>(e.e.get_or_emplace<ObjComp::Ephemeral::MetaCompressionType>());
|
||||||
|
|
||||||
|
// serializable
|
||||||
|
for (const auto& [type, fn] : _fsb_src._sc._serl_json) {
|
||||||
|
//if (!e.e.registry()->storage(type)->contains(e.e)) {
|
||||||
|
//continue;
|
||||||
|
//}
|
||||||
|
|
||||||
|
// this is hacky but we serialize and then deserialize the component
|
||||||
|
// raw copy might be better in the future
|
||||||
|
nlohmann::json tmp_j;
|
||||||
|
if (fn(e.e, tmp_j)) {
|
||||||
|
_fsb_dst._sc._deserl_json.at(type)(oh, tmp_j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// read src and write dst data
|
||||||
|
static_cast<StorageBackendI&>(_fsb_dst).write(oh, ByteSpan{});
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool onEvent(const ObjectStore::Events::ObjectUpdate&) override {
|
||||||
|
std::cerr << "Update called\n";
|
||||||
|
assert(false);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} el {
|
||||||
|
os_src,
|
||||||
|
fsb_src,
|
||||||
|
os_dst,
|
||||||
|
fsb_dst,
|
||||||
|
};
|
||||||
|
|
||||||
// perform scan (which triggers events)
|
// perform scan (which triggers events)
|
||||||
fsb_dst.scanAsync(); // fill with existing?
|
fsb_dst.scanAsync(); // fill with existing?
|
||||||
fsb_src.scanAsync(); // the scan
|
fsb_src.scanAsync(); // the scan
|
||||||
|
@ -29,7 +29,7 @@ struct StorageBackendI {
|
|||||||
// calls data_cb with a buffer to be filled in, cb returns actual count of data. if returned < max, its the last buffer.
|
// calls data_cb with a buffer to be filled in, cb returns actual count of data. if returned < max, its the last buffer.
|
||||||
virtual bool write(Object o, std::function<write_to_storage_fetch_data_cb>& data_cb) = 0;
|
virtual bool write(Object o, std::function<write_to_storage_fetch_data_cb>& data_cb) = 0;
|
||||||
//virtual bool write(Object o, const uint8_t* data, const uint64_t data_size); // default impl
|
//virtual bool write(Object o, const uint8_t* data, const uint64_t data_size); // default impl
|
||||||
virtual bool write(Object o, const ByteSpan data); // default impl
|
bool write(Object o, const ByteSpan data);
|
||||||
|
|
||||||
// ========== read object from storage ==========
|
// ========== read object from storage ==========
|
||||||
using read_from_storage_put_data_cb = void(const ByteSpan buffer);
|
using read_from_storage_put_data_cb = void(const ByteSpan buffer);
|
||||||
|
Loading…
Reference in New Issue
Block a user