switch to streaming compressor for data to drastically improve ratio.

would still benefit from a abstract file refactor
This commit is contained in:
Green Sky 2024-02-18 18:19:49 +01:00
parent 182d844e32
commit 4fb2b51b7d
No known key found for this signature in database

View File

@ -340,6 +340,10 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
compressed_buffer.resize(ZSTD_compressBound(res.size())); compressed_buffer.resize(ZSTD_compressBound(res.size()));
size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), res.data(), res.size(), 0); // 0 is default is probably 3 size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), res.data(), res.size(), 0); // 0 is default is probably 3
if (ZSTD_isError(cSize)) {
std::cerr << "FS error: compressing meta failed\n";
return false; // HACK
}
compressed_buffer.resize(cSize); // maybe skip this resize compressed_buffer.resize(cSize); // maybe skip this resize
@ -351,6 +355,7 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
} }
// now data // now data
if (data_comp == Compression::NONE) {
std::array<uint8_t, 1024> buffer; std::array<uint8_t, 1024> buffer;
uint64_t buffer_actual_size {0}; uint64_t buffer_actual_size {0};
do { do {
@ -363,17 +368,48 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
break; break;
} }
if (data_comp == Compression::NONE) {
data_file.write(reinterpret_cast<const char*>(buffer.data()), buffer_actual_size); data_file.write(reinterpret_cast<const char*>(buffer.data()), buffer_actual_size);
} else if (data_comp == Compression::ZSTD) {
std::vector<uint8_t> compressed_buffer;
compressed_buffer.resize(ZSTD_compressBound(buffer_actual_size));
size_t const cSize = ZSTD_compress(compressed_buffer.data(), compressed_buffer.size(), buffer.data(), buffer_actual_size, 0); // 0 is default is probably 3
data_file.write(reinterpret_cast<const char*>(compressed_buffer.data()), cSize);
}
} while (buffer_actual_size == buffer.size()); } while (buffer_actual_size == buffer.size());
} else if (data_comp == Compression::ZSTD) {
std::vector<uint8_t> buffer(ZSTD_CStreamInSize());
std::vector<uint8_t> compressed_buffer(ZSTD_CStreamOutSize());
uint64_t buffer_actual_size {0};
ZSTD_CCtx* const cctx = ZSTD_createCCtx();
ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 0); // default (3)
ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?)
do {
buffer_actual_size = data_cb(buffer.data(), buffer.size());
//if (buffer_actual_size == 0) {
//break;
//}
if (buffer_actual_size > buffer.size()) {
// wtf
break;
}
bool const lastChunk = (buffer_actual_size < buffer.size());
ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
ZSTD_inBuffer input = { buffer.data(), buffer_actual_size, 0 };
while (input.pos < input.size) {
ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 };
size_t const remaining = ZSTD_compressStream2(cctx, &output , &input, mode);
if (ZSTD_isError(remaining)) {
std::cerr << "FS error: compressing data failed\n";
break;
}
data_file.write(reinterpret_cast<const char*>(compressed_buffer.data()), output.pos);
if (remaining == 0) {
break;
}
}
// same as if lastChunk break;
} while (buffer_actual_size == buffer.size());
}
meta_file.flush(); meta_file.flush();
data_file.flush(); data_file.flush();