save msg json zstd compressed (3x compression)

This commit is contained in:
Green Sky 2024-02-18 02:40:17 +01:00
parent fb885b5c21
commit 6f511016bc
No known key found for this signature in database
5 changed files with 86 additions and 13 deletions

View File

@ -45,5 +45,8 @@ if (NOT TARGET zstd::zstd)
)
FetchContent_MakeAvailable(zstd)
add_library(zstd::zstd ALIAS libzstd_static)
add_library(zstd INTERFACE) # somehow zstd fkd this up
target_include_directories(zstd INTERFACE ${zstd_SOURCE_DIR}/lib/)
target_link_libraries(zstd INTERFACE libzstd_static)
add_library(zstd::zstd ALIAS zstd)
endif()

View File

@ -19,6 +19,8 @@ target_link_libraries(fragment_store PUBLIC
EnTT::EnTT
solanaceae_util
zstd::zstd
solanaceae_tox_messages # TODO: move
)

View File

@ -8,6 +8,8 @@
#include <nlohmann/json.hpp>
#include <zstd.h>
#include <cstdint>
#include <fstream>
#include <filesystem>
@ -282,6 +284,11 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
return false;
}
Compression data_comp = Compression::NONE; // TODO: better defaults
if (_reg.all_of<FragComp::DataCompressionType>(fid)) {
data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp;
}
std::ofstream data_file{
_reg.get<FragComp::Ephemeral::FilePath>(fid).path,
std::ios::out | std::ios::trunc | std::ios::binary // always binary, also for text
@ -324,8 +331,22 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
if (meta_type == MetaFileType::BINARY_MSGPACK) { // binary metadata file
const auto res = nlohmann::json::to_msgpack(meta_data);
// TODO: refactor
if (meta_comp == Compression::NONE) {
meta_file.write(reinterpret_cast<const char*>(res.data()), res.size());
} else if (meta_comp == Compression::ZSTD) {
std::vector<uint8_t> compressed_buffer;
compressed_buffer.resize(ZSTD_compressBound(res.size()));
size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), res.data(), res.size(), 0); // 0 is default is probably 3
compressed_buffer.resize(cSize); // maybe skip this resize
meta_file.write(reinterpret_cast<const char*>(compressed_buffer.data()), compressed_buffer.size());
}
} else if (meta_type == MetaFileType::TEXT_JSON) {
// cant be compressed or encrypted
meta_file << meta_data.dump(2, ' ', true);
}
@ -342,7 +363,16 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
break;
}
if (data_comp == Compression::NONE) {
data_file.write(reinterpret_cast<const char*>(buffer.data()), buffer_actual_size);
} else if (data_comp == Compression::ZSTD) {
std::vector<uint8_t> compressed_buffer;
compressed_buffer.resize(ZSTD_compressBound(buffer_actual_size));
size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), buffer.data(), buffer_actual_size, 0); // 0 is default is probably 3
data_file.write(reinterpret_cast<const char*>(compressed_buffer.data()), cSize);
}
} while (buffer_actual_size == buffer.size());
meta_file.flush();
@ -399,6 +429,12 @@ bool FragmentStore::loadFromStorage(FragmentID fid, std::function<read_from_stor
return false;
}
Compression data_comp = Compression::NONE;
if (_reg.all_of<FragComp::DataCompressionType>(fid)) {
data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp;
}
if (data_comp == Compression::NONE) {
std::array<uint8_t, 1024> buffer;
uint64_t buffer_actual_size {0};
do {
@ -411,6 +447,35 @@ bool FragmentStore::loadFromStorage(FragmentID fid, std::function<read_from_stor
data_cb(buffer.data(), buffer_actual_size);
} while (buffer_actual_size == buffer.size() && !data_file.eof());
} else if (data_comp == Compression::ZSTD) {
std::vector<uint8_t> in_buffer(ZSTD_DStreamInSize());
std::vector<uint8_t> out_buffer(ZSTD_DStreamOutSize());
ZSTD_DCtx* const dctx = ZSTD_createDCtx();
uint64_t buffer_actual_size {0};
do {
data_file.read(reinterpret_cast<char*>(in_buffer.data()), in_buffer.size());
buffer_actual_size = data_file.gcount();
if (buffer_actual_size == 0) {
break;
}
ZSTD_inBuffer input {in_buffer.data(), buffer_actual_size, 0 };
do {
ZSTD_outBuffer output = { out_buffer.data(), out_buffer.size(), 0 };
size_t const ret = ZSTD_decompressStream(dctx, &output , &input);
if (ZSTD_isError(ret)) {
// error <.<
std::cerr << "FS error: decompression error\n";
break;
}
data_cb(out_buffer.data(), output.pos);
} while (input.pos < input.size);
} while (buffer_actual_size == in_buffer.size() && !data_file.eof());
ZSTD_freeDCtx(dctx);
}
return true;
}

View File

@ -154,6 +154,8 @@ void MessageFragmentStore::handleMessage(const Message3Handle& m) {
fragment_uid = fh.get<FragComp::ID>().v;
fh.emplace_or_replace<FragComp::DataCompressionType>().comp = Compression::ZSTD;
auto& new_ts_range = fh.emplace<FragComp::MessagesTSRange>();
new_ts_range.begin = msg_ts;
new_ts_range.end = msg_ts;

View File

@ -7,6 +7,7 @@ enum class Encryption : uint8_t {
};
enum class Compression : uint8_t {
NONE = 0x00,
ZSTD = 0x01,
};
enum class MetaFileType : uint8_t {
TEXT_JSON,