From 6d150ba44154798b1716d30eba2d81ee4135d9b5 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Mon, 1 Apr 2024 17:57:14 +0200 Subject: [PATCH] roll back meta comp (did it wrong) and enable data comp --- src/fragment_store/fragment_store.cpp | 117 +++++++++++++------------- 1 file changed, 60 insertions(+), 57 deletions(-) diff --git a/src/fragment_store/fragment_store.cpp b/src/fragment_store/fragment_store.cpp index 1b00d644..c144a325 100644 --- a/src/fragment_store/fragment_store.cpp +++ b/src/fragment_store/fragment_store.cpp @@ -270,22 +270,23 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid)) { - _reg.emplace_or_replace(fid, Encryption::NONE); - //} - //if (_reg.all_of(fid)) { - _reg.emplace_or_replace(fid, Compression::NONE); - //} + _reg.emplace_or_replace(fid, Encryption::NONE); + _reg.emplace_or_replace(fid, Compression::NONE); } std::filesystem::path meta_tmp_path = _reg.get(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp"; meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string()); - auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp); - //meta_file_stack.push(std::make_unique(std::string_view{meta_tmp_path.generic_u8string()}, true)); + // TODO: make meta comp work with mem compressor + //auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp); + std::stack> meta_file_stack; + meta_file_stack.push(std::make_unique(std::string_view{meta_tmp_path.generic_u8string()})); + + if (!meta_file_stack.top()->isGood()) { + std::cerr << "FS error: opening file for writing '" << meta_tmp_path << "'\n"; + return {}; + } if (meta_file_stack.empty()) { std::cerr << "FS error: failed to create temporary meta file stack\n"; @@ -293,20 +294,19 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(fid)) { + data_enc = _reg.get(fid).enc; + } if (_reg.all_of(fid)) { data_comp = _reg.get(fid).comp; } std::filesystem::path data_tmp_path = _reg.get(fid).path + ".tmp"; data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string()); - std::ofstream data_file{ - data_tmp_path, - std::ios::out | std::ios::trunc | std::ios::binary // always binary, also for text - }; - - if (!data_file.is_open()) { - //meta_file.close(); + auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp); + if (!data_file_stack.empty()) { while (!meta_file_stack.empty()) { meta_file_stack.pop(); } std::filesystem::remove(meta_tmp_path); std::cerr << "FS error: failed to create temporary data file\n"; @@ -386,8 +386,10 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function buffer; + //if (data_comp == Compression::NONE) { + // for zstd compression, chunk size is frame size. (no cross frame referencing) + static constexpr int64_t chunk_size{1024*1024*10}; + std::array buffer; uint64_t buffer_actual_size {0}; do { buffer_actual_size = data_cb(buffer.data(), buffer.size()); @@ -399,56 +401,57 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function(buffer.data()), buffer_actual_size); + data_file_stack.top()->write({buffer.data(), buffer_actual_size}); } while (buffer_actual_size == buffer.size()); - } else if (data_comp == Compression::ZSTD) { - std::vector buffer(ZSTD_CStreamInSize()); - std::vector compressed_buffer(ZSTD_CStreamOutSize()); - uint64_t buffer_actual_size {0}; + //} else if (data_comp == Compression::ZSTD) { + //std::vector buffer(ZSTD_CStreamInSize()); + //std::vector compressed_buffer(ZSTD_CStreamOutSize()); + //uint64_t buffer_actual_size {0}; - std::unique_ptr cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx}; - ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_compressionLevel, 0); // default (3) - ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?) - do { - buffer_actual_size = data_cb(buffer.data(), buffer.size()); - //if (buffer_actual_size == 0) { + //std::unique_ptr cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx}; + //ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_compressionLevel, 0); // default (3) + //ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?) + //do { + //buffer_actual_size = data_cb(buffer.data(), buffer.size()); + ////if (buffer_actual_size == 0) { + ////break; + ////} + //if (buffer_actual_size > buffer.size()) { + //// wtf //break; //} - if (buffer_actual_size > buffer.size()) { - // wtf - break; - } - bool const lastChunk = (buffer_actual_size < buffer.size()); + //bool const lastChunk = (buffer_actual_size < buffer.size()); - ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue; - ZSTD_inBuffer input = { buffer.data(), buffer_actual_size, 0 }; + //ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue; + //ZSTD_inBuffer input = { buffer.data(), buffer_actual_size, 0 }; - while (input.pos < input.size) { - ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 }; + //while (input.pos < input.size) { + //ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 }; - size_t const remaining = ZSTD_compressStream2(cctx.get(), &output , &input, mode); - if (ZSTD_isError(remaining)) { - std::cerr << "FS error: compressing data failed\n"; - break; - } + //size_t const remaining = ZSTD_compressStream2(cctx.get(), &output , &input, mode); + //if (ZSTD_isError(remaining)) { + //std::cerr << "FS error: compressing data failed\n"; + //break; + //} - data_file.write(reinterpret_cast(compressed_buffer.data()), output.pos); + //data_file.write(reinterpret_cast(compressed_buffer.data()), output.pos); - if (remaining == 0) { - break; - } - } - // same as if lastChunk break; - } while (buffer_actual_size == buffer.size()); - } else { - assert(false && "implement me"); - } + //if (remaining == 0) { + //break; + //} + //} + //// same as if lastChunk break; + //} while (buffer_actual_size == buffer.size()); + //} else { + //assert(false && "implement me"); + //} //meta_file.flush(); //meta_file.close(); while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? - data_file.flush(); - data_file.close(); + //data_file.flush(); + //data_file.close(); + while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope? std::filesystem::rename( meta_tmp_path,