add file2 impl for zstd (lightly tested and not integrated yet)
This commit is contained in:
parent
318be9cd62
commit
def7fc1959
@ -1,6 +1,9 @@
|
||||
cmake_minimum_required(VERSION 3.9 FATAL_ERROR)
|
||||
|
||||
add_library(fragment_store
|
||||
./fragment_store/file2_zstd.hpp
|
||||
./fragment_store/file2_zstd.cpp
|
||||
|
||||
./fragment_store/fragment_store_i.hpp
|
||||
./fragment_store/fragment_store_i.cpp
|
||||
./fragment_store/types.hpp
|
||||
@ -19,6 +22,7 @@ target_link_libraries(fragment_store PUBLIC
|
||||
EnTT::EnTT
|
||||
solanaceae_util
|
||||
|
||||
solanaceae_file2
|
||||
zstd::zstd
|
||||
|
||||
solanaceae_tox_messages # TODO: move
|
||||
@ -26,6 +30,16 @@ target_link_libraries(fragment_store PUBLIC
|
||||
|
||||
########################################
|
||||
|
||||
add_executable(test_file_zstd
|
||||
fragment_store/test_file_zstd.cpp
|
||||
)
|
||||
|
||||
target_link_libraries(test_file_zstd PUBLIC
|
||||
fragment_store
|
||||
)
|
||||
|
||||
########################################
|
||||
|
||||
add_library(message_fragment_store
|
||||
./fragment_store/message_serializer.hpp
|
||||
./fragment_store/message_serializer.cpp
|
||||
|
200
src/fragment_store/file2_zstd.cpp
Normal file
200
src/fragment_store/file2_zstd.cpp
Normal file
@ -0,0 +1,200 @@
|
||||
#include "./file2_zstd.hpp"
|
||||
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
#include <cassert>
|
||||
|
||||
File2ZSTDW::File2ZSTDW(File2I& real) :
|
||||
File2I(true, false),
|
||||
_real_file(real)
|
||||
{
|
||||
ZSTD_CCtx_setParameter(_cctx.get(), ZSTD_c_compressionLevel, 0); // default (3)
|
||||
ZSTD_CCtx_setParameter(_cctx.get(), ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?)
|
||||
}
|
||||
|
||||
bool File2ZSTDW::isGood(void) {
|
||||
return _real_file.isGood();
|
||||
}
|
||||
|
||||
bool File2ZSTDW::write(const ByteSpan data, int64_t pos) {
|
||||
if (pos != -1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (data.empty()) {
|
||||
return false; // return true?
|
||||
}
|
||||
|
||||
if (data.size < 16) {
|
||||
std::cout << "F2ZSTD warning: each write is a zstd frame and compression suffers significantly for small frames.\n";
|
||||
}
|
||||
|
||||
std::vector<uint8_t> compressed_buffer(ZSTD_CStreamOutSize());
|
||||
|
||||
ZSTD_inBuffer input = { data.ptr, data.size, 0 };
|
||||
|
||||
size_t remaining_ret {0};
|
||||
do {
|
||||
// remaining data in input < compressed_buffer size (heuristic)
|
||||
bool const lastChunk = (input.size - input.pos) <= compressed_buffer.size();
|
||||
|
||||
ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
|
||||
|
||||
ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 };
|
||||
|
||||
remaining_ret = ZSTD_compressStream2(_cctx.get(), &output , &input, mode);
|
||||
if (ZSTD_isError(remaining_ret)) {
|
||||
std::cerr << "F2WRZSTD error: compressing data failed\n";
|
||||
break;
|
||||
}
|
||||
|
||||
_real_file.write(ByteSpan{compressed_buffer.data(), output.pos});
|
||||
} while (input.pos < input.size && remaining_ret != 0 && _real_file.isGood());
|
||||
|
||||
return _real_file.isGood();
|
||||
}
|
||||
|
||||
std::variant<ByteSpan, std::vector<uint8_t>> File2ZSTDW::read(uint64_t, int64_t) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// ######################################### decompression
|
||||
|
||||
File2ZSTDR::File2ZSTDR(File2I& real) :
|
||||
File2I(false, true),
|
||||
_real_file(real),
|
||||
|
||||
// 64kib
|
||||
_in_buffer(ZSTD_DStreamInSize()),
|
||||
_out_buffer(ZSTD_DStreamOutSize())
|
||||
{
|
||||
}
|
||||
|
||||
bool File2ZSTDR::isGood(void) {
|
||||
return _real_file.isGood();
|
||||
}
|
||||
|
||||
bool File2ZSTDR::write(const ByteSpan, int64_t) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::variant<ByteSpan, std::vector<uint8_t>> File2ZSTDR::read(uint64_t size, int64_t pos) {
|
||||
if (pos != -1) {
|
||||
// error, only support streaming (for now)
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<uint8_t> ret_data;
|
||||
|
||||
// actually first we check previous data
|
||||
if (!_decompressed_buffer.empty()) {
|
||||
uint64_t required_size = std::min(size, _decompressed_buffer.size());
|
||||
ret_data.insert(ret_data.end(), _decompressed_buffer.cbegin(), _decompressed_buffer.cbegin() + required_size);
|
||||
_decompressed_buffer.erase(_decompressed_buffer.cbegin(), _decompressed_buffer.cbegin() + required_size);
|
||||
}
|
||||
|
||||
bool eof {false};
|
||||
// outerloop here
|
||||
while (ret_data.size() < size && !eof) {
|
||||
// first make sure we have data in input
|
||||
if (_z_input.src == nullptr || _z_input.pos == _z_input.size) {
|
||||
const auto request_size = _in_buffer.size();
|
||||
if (!feedInput(_real_file.read(request_size, -1))) {
|
||||
return ret_data;
|
||||
}
|
||||
std::cout << "---- fed input " << _z_input.size << "bytes\n";
|
||||
// if _z_input.size < _in_buffer.size() -> assume eof?
|
||||
if (_z_input.size < request_size) {
|
||||
eof = true;
|
||||
std::cout << "---- eof\n";
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
ZSTD_outBuffer output = { _out_buffer.data(), _out_buffer.size(), 0 };
|
||||
size_t const ret = ZSTD_decompressStream(_dctx.get(), &output , &_z_input);
|
||||
if (ZSTD_isError(ret)) {
|
||||
// error <.<
|
||||
std::cerr << "---- error: decompression error\n";
|
||||
return ret_data;
|
||||
}
|
||||
|
||||
// no new decomp data?
|
||||
if (output.pos == 0) {
|
||||
assert(eof || ret == 0);
|
||||
break;
|
||||
}
|
||||
|
||||
int64_t returning_size = std::min<int64_t>(int64_t(size) - int64_t(ret_data.size()), output.pos);
|
||||
assert(returning_size >= 0);
|
||||
if (returning_size > 0) {
|
||||
ret_data.insert(
|
||||
ret_data.end(),
|
||||
reinterpret_cast<const uint8_t*>(output.dst),
|
||||
reinterpret_cast<const uint8_t*>(output.dst) + returning_size
|
||||
);
|
||||
}
|
||||
|
||||
// make sure we keep excess decompressed data
|
||||
if (returning_size < int64_t(output.pos)) {
|
||||
//const auto remaining_size = output.pos - returning_size;
|
||||
_decompressed_buffer.insert(
|
||||
_decompressed_buffer.cend(),
|
||||
reinterpret_cast<const uint8_t*>(output.dst) + returning_size,
|
||||
reinterpret_cast<const uint8_t*>(output.dst) + output.pos
|
||||
);
|
||||
}
|
||||
} while (_z_input.pos < _z_input.size);
|
||||
}
|
||||
|
||||
return ret_data;
|
||||
}
|
||||
|
||||
bool File2ZSTDR::feedInput(std::variant<ByteSpan, std::vector<uint8_t>>&& read_buff) {
|
||||
// TODO: optimize, we copy the buffer, but we might not need to
|
||||
if (std::holds_alternative<ByteSpan>(read_buff)) {
|
||||
const auto& span = std::get<ByteSpan>(read_buff);
|
||||
std::cout << "---- feedInput got span " << span.size << "\n";
|
||||
if (span.size > _in_buffer.size()) {
|
||||
// error, how did we read more than we asked for??
|
||||
return {};
|
||||
}
|
||||
|
||||
if (span.empty()) {
|
||||
_z_input = { _in_buffer.data(), 0, 0 };
|
||||
} else {
|
||||
// cpy
|
||||
_in_buffer = static_cast<std::vector<uint8_t>>(span);
|
||||
_z_input = {
|
||||
_in_buffer.data(),
|
||||
span.size,
|
||||
0
|
||||
};
|
||||
}
|
||||
} else if (std::holds_alternative<std::vector<uint8_t>>(read_buff)) {
|
||||
auto& vec = std::get<std::vector<uint8_t>>(read_buff);
|
||||
std::cout << "---- feedInput got vec " << vec.size() << "\n";
|
||||
if (vec.size() > _in_buffer.size()) {
|
||||
// error, how did we read more than we asked for??
|
||||
return {};
|
||||
}
|
||||
|
||||
// cpy
|
||||
_in_buffer = vec;
|
||||
|
||||
_z_input = {
|
||||
_in_buffer.data(),
|
||||
_in_buffer.size(),
|
||||
0
|
||||
};
|
||||
} else {
|
||||
// error, unsupported return value of read??
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
51
src/fragment_store/file2_zstd.hpp
Normal file
51
src/fragment_store/file2_zstd.hpp
Normal file
@ -0,0 +1,51 @@
|
||||
#pragma once
|
||||
|
||||
#include <solanaceae/file/file2.hpp>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include <zstd.h>
|
||||
|
||||
// zstd compression wrapper over another file
|
||||
// WARNING: only supports sequential writes
|
||||
struct File2ZSTDW : public File2I {
|
||||
File2I& _real_file;
|
||||
|
||||
// TODO: hide this detail?
|
||||
std::unique_ptr<ZSTD_CCtx, decltype(&ZSTD_freeCCtx)> _cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx};
|
||||
|
||||
File2ZSTDW(File2I& real);
|
||||
virtual ~File2ZSTDW(void) {}
|
||||
|
||||
bool isGood(void) override;
|
||||
|
||||
// for simplicity and potential future seekability each write is its own frame
|
||||
bool write(const ByteSpan data, int64_t pos = -1) override;
|
||||
std::variant<ByteSpan, std::vector<uint8_t>> read(uint64_t size, int64_t pos = -1) override;
|
||||
};
|
||||
|
||||
// zstd decompression wrapper over another file
|
||||
// WARNING: only supports sequential reads
|
||||
// TODO: add seeking support (use frames)
|
||||
struct File2ZSTDR : public File2I {
|
||||
File2I& _real_file;
|
||||
|
||||
// TODO: hide this detail?
|
||||
std::unique_ptr<ZSTD_DCtx, decltype(&ZSTD_freeDCtx)> _dctx{ZSTD_createDCtx(), &ZSTD_freeDCtx};
|
||||
std::vector<uint8_t> _in_buffer;
|
||||
std::vector<uint8_t> _out_buffer;
|
||||
std::vector<uint8_t> _decompressed_buffer; // retains decompressed unread data between read() calls
|
||||
ZSTD_inBuffer _z_input{nullptr, 0, 0};
|
||||
|
||||
File2ZSTDR(File2I& real);
|
||||
virtual ~File2ZSTDR(void) {}
|
||||
|
||||
bool isGood(void) override;
|
||||
|
||||
bool write(const ByteSpan data, int64_t pos = -1) override;
|
||||
std::variant<ByteSpan, std::vector<uint8_t>> read(uint64_t size, int64_t pos = -1) override;
|
||||
|
||||
private:
|
||||
bool feedInput(std::variant<ByteSpan, std::vector<uint8_t>>&& read_buff);
|
||||
};
|
||||
|
151
src/fragment_store/test_file_zstd.cpp
Normal file
151
src/fragment_store/test_file_zstd.cpp
Normal file
@ -0,0 +1,151 @@
|
||||
#include "./file2_zstd.hpp"
|
||||
|
||||
#include <solanaceae/util/span.hpp>
|
||||
#include <solanaceae/file/file2_std.hpp>
|
||||
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <variant>
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
|
||||
const static std::string_view test_text1{"test1 1234 1234 :) 1234 5678 88888888\n"};
|
||||
const static ByteSpan data_test_text1{
|
||||
reinterpret_cast<const uint8_t*>(test_text1.data()),
|
||||
test_text1.size()
|
||||
};
|
||||
|
||||
const static std::string_view test_text2{"test2 1234 1234 :) 1234 5678 88888888\n"};
|
||||
const static ByteSpan data_test_text2{
|
||||
reinterpret_cast<const uint8_t*>(test_text2.data()),
|
||||
test_text2.size()
|
||||
};
|
||||
|
||||
const static std::string_view test_text3{
|
||||
"00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n"
|
||||
"00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n"
|
||||
"00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n"
|
||||
"00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n"
|
||||
"00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n"
|
||||
"00000000000000000000000000000000000000000000000000000 test 00000000000000000000000000000000000000\n"
|
||||
};
|
||||
const static ByteSpan data_test_text3{
|
||||
reinterpret_cast<const uint8_t*>(test_text3.data()),
|
||||
test_text3.size()
|
||||
};
|
||||
|
||||
int main(void) {
|
||||
const auto temp_dir = std::filesystem::temp_directory_path() / "file2wzstdtests";
|
||||
|
||||
std::filesystem::create_directories(temp_dir); // making sure
|
||||
assert(std::filesystem::exists(temp_dir));
|
||||
std::cout << "test temp dir: " << temp_dir << "\n";
|
||||
|
||||
const auto test1_file_path = temp_dir / "testfile1.zstd";
|
||||
{ // simple write test
|
||||
File2WFile f_w_file{test1_file_path.c_str(), true};
|
||||
assert(f_w_file.isGood());
|
||||
|
||||
File2ZSTDW f_w_zstd{f_w_file};
|
||||
assert(f_w_zstd.isGood());
|
||||
assert(f_w_file.isGood());
|
||||
|
||||
//bool res = f_w_file.write(data_test_text1);
|
||||
bool res = f_w_zstd.write(data_test_text1);
|
||||
assert(res);
|
||||
assert(f_w_zstd.isGood());
|
||||
assert(f_w_file.isGood());
|
||||
|
||||
// write another frame of the same data
|
||||
res = f_w_zstd.write(data_test_text2);
|
||||
assert(res);
|
||||
assert(f_w_zstd.isGood());
|
||||
assert(f_w_file.isGood());
|
||||
|
||||
// write larger frame
|
||||
res = f_w_zstd.write(data_test_text3);
|
||||
assert(res);
|
||||
assert(f_w_zstd.isGood());
|
||||
assert(f_w_file.isGood());
|
||||
}
|
||||
|
||||
// after flush
|
||||
assert(std::filesystem::file_size(test1_file_path) != 0);
|
||||
|
||||
{ // simple read test (using write test created file)
|
||||
File2RFile f_r_file{test1_file_path.c_str()};
|
||||
assert(f_r_file.isGood());
|
||||
|
||||
File2ZSTDR f_r_zstd{f_r_file};
|
||||
assert(f_r_zstd.isGood());
|
||||
assert(f_r_file.isGood());
|
||||
|
||||
// reads return owning buffers
|
||||
|
||||
{ // readback data_test_text1
|
||||
auto r_res_var = f_r_zstd.read(data_test_text1.size);
|
||||
|
||||
//assert(f_r_zstd.isGood());
|
||||
//assert(f_r_file.isGood());
|
||||
assert(std::holds_alternative<std::vector<uint8_t>>(r_res_var));
|
||||
const auto& r_res_vec = std::get<std::vector<uint8_t>>(r_res_var);
|
||||
|
||||
//std::cout << "decomp: " << std::string_view{reinterpret_cast<const char*>(r_res_vec.data()), r_res_vec.size()};
|
||||
|
||||
assert(std::get<std::vector<uint8_t>>(r_res_var).size() == data_test_text1.size);
|
||||
assert(std::equal(data_test_text1.cbegin(), data_test_text1.cend(), std::get<std::vector<uint8_t>>(r_res_var).cbegin()));
|
||||
}
|
||||
|
||||
{ // readback data_test_text2
|
||||
auto r_res_var = f_r_zstd.read(data_test_text2.size);
|
||||
|
||||
//assert(f_r_zstd.isGood());
|
||||
//assert(f_r_file.isGood());
|
||||
assert(std::holds_alternative<std::vector<uint8_t>>(r_res_var));
|
||||
const auto& r_res_vec = std::get<std::vector<uint8_t>>(r_res_var);
|
||||
|
||||
//std::cout << "decomp: " << std::string_view{reinterpret_cast<const char*>(r_res_vec.data()), r_res_vec.size()};
|
||||
|
||||
assert(std::get<std::vector<uint8_t>>(r_res_var).size() == data_test_text2.size);
|
||||
assert(std::equal(
|
||||
data_test_text2.cbegin(),
|
||||
data_test_text2.cend(),
|
||||
std::get<std::vector<uint8_t>>(r_res_var).cbegin()
|
||||
));
|
||||
}
|
||||
|
||||
{ // readback data_test_text3
|
||||
auto r_res_var = f_r_zstd.read(data_test_text3.size);
|
||||
|
||||
//assert(f_r_zstd.isGood());
|
||||
//assert(f_r_file.isGood());
|
||||
assert(std::holds_alternative<std::vector<uint8_t>>(r_res_var));
|
||||
const auto& r_res_vec = std::get<std::vector<uint8_t>>(r_res_var);
|
||||
|
||||
//std::cout << "decomp: " << std::string_view{reinterpret_cast<const char*>(r_res_vec.data()), r_res_vec.size()};
|
||||
|
||||
assert(std::get<std::vector<uint8_t>>(r_res_var).size() == data_test_text3.size);
|
||||
assert(std::equal(
|
||||
data_test_text3.cbegin(),
|
||||
data_test_text3.cend(),
|
||||
r_res_vec.cbegin()
|
||||
));
|
||||
}
|
||||
|
||||
{ // assert eof somehow
|
||||
// since its eof, reading a single byte should return a zero sized buffer
|
||||
auto r_res_var = f_r_zstd.read(1);
|
||||
if (std::holds_alternative<std::vector<uint8_t>>(r_res_var)) {
|
||||
assert(std::get<std::vector<uint8_t>>(r_res_var).empty());
|
||||
} else if (std::holds_alternative<ByteSpan>(r_res_var)) {
|
||||
assert(std::get<ByteSpan>(r_res_var).empty());
|
||||
} else {
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup
|
||||
std::filesystem::remove_all(temp_dir);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user