forked from Green-Sky/tomato
roll back meta comp (did it wrong) and enable data comp
This commit is contained in:
parent
c737715c66
commit
6d150ba441
@ -270,22 +270,23 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// we cant have encryption or compression
|
// we cant have encryption or compression
|
||||||
|
// so we force NONE for TEXT JSON
|
||||||
|
|
||||||
// TODO: warning/error?
|
|
||||||
|
|
||||||
// TODO: forcing for testing
|
|
||||||
//if (_reg.all_of<Components::Ephemeral::MetaEncryptionType>(fid)) {
|
|
||||||
_reg.emplace_or_replace<FragComp::Ephemeral::MetaEncryptionType>(fid, Encryption::NONE);
|
_reg.emplace_or_replace<FragComp::Ephemeral::MetaEncryptionType>(fid, Encryption::NONE);
|
||||||
//}
|
|
||||||
//if (_reg.all_of<Components::Ephemeral::MetaCompressionType>(fid)) {
|
|
||||||
_reg.emplace_or_replace<FragComp::Ephemeral::MetaCompressionType>(fid, Compression::NONE);
|
_reg.emplace_or_replace<FragComp::Ephemeral::MetaCompressionType>(fid, Compression::NONE);
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::filesystem::path meta_tmp_path = _reg.get<FragComp::Ephemeral::FilePath>(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp";
|
std::filesystem::path meta_tmp_path = _reg.get<FragComp::Ephemeral::FilePath>(fid).path + ".meta" + metaFileTypeSuffix(meta_type) + ".tmp";
|
||||||
meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string());
|
meta_tmp_path.replace_filename("." + meta_tmp_path.filename().generic_u8string());
|
||||||
auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp);
|
// TODO: make meta comp work with mem compressor
|
||||||
//meta_file_stack.push(std::make_unique<File2WFile>(std::string_view{meta_tmp_path.generic_u8string()}, true));
|
//auto meta_file_stack = buildFileStackWrite(std::string_view{meta_tmp_path.generic_u8string()}, meta_enc, meta_comp);
|
||||||
|
std::stack<std::unique_ptr<File2I>> meta_file_stack;
|
||||||
|
meta_file_stack.push(std::make_unique<File2WFile>(std::string_view{meta_tmp_path.generic_u8string()}));
|
||||||
|
|
||||||
|
if (!meta_file_stack.top()->isGood()) {
|
||||||
|
std::cerr << "FS error: opening file for writing '" << meta_tmp_path << "'\n";
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
if (meta_file_stack.empty()) {
|
if (meta_file_stack.empty()) {
|
||||||
std::cerr << "FS error: failed to create temporary meta file stack\n";
|
std::cerr << "FS error: failed to create temporary meta file stack\n";
|
||||||
@ -293,20 +294,19 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Encryption data_enc = Encryption::NONE; // TODO: better defaults
|
||||||
Compression data_comp = Compression::NONE; // TODO: better defaults
|
Compression data_comp = Compression::NONE; // TODO: better defaults
|
||||||
|
if (_reg.all_of<FragComp::DataEncryptionType>(fid)) {
|
||||||
|
data_enc = _reg.get<FragComp::DataEncryptionType>(fid).enc;
|
||||||
|
}
|
||||||
if (_reg.all_of<FragComp::DataCompressionType>(fid)) {
|
if (_reg.all_of<FragComp::DataCompressionType>(fid)) {
|
||||||
data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp;
|
data_comp = _reg.get<FragComp::DataCompressionType>(fid).comp;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::filesystem::path data_tmp_path = _reg.get<FragComp::Ephemeral::FilePath>(fid).path + ".tmp";
|
std::filesystem::path data_tmp_path = _reg.get<FragComp::Ephemeral::FilePath>(fid).path + ".tmp";
|
||||||
data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string());
|
data_tmp_path.replace_filename("." + data_tmp_path.filename().generic_u8string());
|
||||||
std::ofstream data_file{
|
auto data_file_stack = buildFileStackWrite(std::string_view{data_tmp_path.generic_u8string()}, data_enc, data_comp);
|
||||||
data_tmp_path,
|
if (!data_file_stack.empty()) {
|
||||||
std::ios::out | std::ios::trunc | std::ios::binary // always binary, also for text
|
|
||||||
};
|
|
||||||
|
|
||||||
if (!data_file.is_open()) {
|
|
||||||
//meta_file.close();
|
|
||||||
while (!meta_file_stack.empty()) { meta_file_stack.pop(); }
|
while (!meta_file_stack.empty()) { meta_file_stack.pop(); }
|
||||||
std::filesystem::remove(meta_tmp_path);
|
std::filesystem::remove(meta_tmp_path);
|
||||||
std::cerr << "FS error: failed to create temporary data file\n";
|
std::cerr << "FS error: failed to create temporary data file\n";
|
||||||
@ -386,8 +386,10 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
|
|||||||
}
|
}
|
||||||
|
|
||||||
// now data
|
// now data
|
||||||
if (data_comp == Compression::NONE) {
|
//if (data_comp == Compression::NONE) {
|
||||||
std::array<uint8_t, 1024> buffer;
|
// for zstd compression, chunk size is frame size. (no cross frame referencing)
|
||||||
|
static constexpr int64_t chunk_size{1024*1024*10};
|
||||||
|
std::array<uint8_t, chunk_size> buffer;
|
||||||
uint64_t buffer_actual_size {0};
|
uint64_t buffer_actual_size {0};
|
||||||
do {
|
do {
|
||||||
buffer_actual_size = data_cb(buffer.data(), buffer.size());
|
buffer_actual_size = data_cb(buffer.data(), buffer.size());
|
||||||
@ -399,56 +401,57 @@ bool FragmentStore::syncToStorage(FragmentID fid, std::function<write_to_storage
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
data_file.write(reinterpret_cast<const char*>(buffer.data()), buffer_actual_size);
|
data_file_stack.top()->write({buffer.data(), buffer_actual_size});
|
||||||
} while (buffer_actual_size == buffer.size());
|
} while (buffer_actual_size == buffer.size());
|
||||||
} else if (data_comp == Compression::ZSTD) {
|
//} else if (data_comp == Compression::ZSTD) {
|
||||||
std::vector<uint8_t> buffer(ZSTD_CStreamInSize());
|
//std::vector<uint8_t> buffer(ZSTD_CStreamInSize());
|
||||||
std::vector<uint8_t> compressed_buffer(ZSTD_CStreamOutSize());
|
//std::vector<uint8_t> compressed_buffer(ZSTD_CStreamOutSize());
|
||||||
uint64_t buffer_actual_size {0};
|
//uint64_t buffer_actual_size {0};
|
||||||
|
|
||||||
std::unique_ptr<ZSTD_CCtx, decltype(&ZSTD_freeCCtx)> cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx};
|
//std::unique_ptr<ZSTD_CCtx, decltype(&ZSTD_freeCCtx)> cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx};
|
||||||
ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_compressionLevel, 0); // default (3)
|
//ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_compressionLevel, 0); // default (3)
|
||||||
ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?)
|
//ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_checksumFlag, 1); // add extra checksums (to frames?)
|
||||||
do {
|
//do {
|
||||||
buffer_actual_size = data_cb(buffer.data(), buffer.size());
|
//buffer_actual_size = data_cb(buffer.data(), buffer.size());
|
||||||
//if (buffer_actual_size == 0) {
|
////if (buffer_actual_size == 0) {
|
||||||
|
////break;
|
||||||
|
////}
|
||||||
|
//if (buffer_actual_size > buffer.size()) {
|
||||||
|
//// wtf
|
||||||
//break;
|
//break;
|
||||||
//}
|
//}
|
||||||
if (buffer_actual_size > buffer.size()) {
|
//bool const lastChunk = (buffer_actual_size < buffer.size());
|
||||||
// wtf
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
bool const lastChunk = (buffer_actual_size < buffer.size());
|
|
||||||
|
|
||||||
ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
|
//ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
|
||||||
ZSTD_inBuffer input = { buffer.data(), buffer_actual_size, 0 };
|
//ZSTD_inBuffer input = { buffer.data(), buffer_actual_size, 0 };
|
||||||
|
|
||||||
while (input.pos < input.size) {
|
//while (input.pos < input.size) {
|
||||||
ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 };
|
//ZSTD_outBuffer output = { compressed_buffer.data(), compressed_buffer.size(), 0 };
|
||||||
|
|
||||||
size_t const remaining = ZSTD_compressStream2(cctx.get(), &output , &input, mode);
|
//size_t const remaining = ZSTD_compressStream2(cctx.get(), &output , &input, mode);
|
||||||
if (ZSTD_isError(remaining)) {
|
//if (ZSTD_isError(remaining)) {
|
||||||
std::cerr << "FS error: compressing data failed\n";
|
//std::cerr << "FS error: compressing data failed\n";
|
||||||
break;
|
//break;
|
||||||
}
|
//}
|
||||||
|
|
||||||
data_file.write(reinterpret_cast<const char*>(compressed_buffer.data()), output.pos);
|
//data_file.write(reinterpret_cast<const char*>(compressed_buffer.data()), output.pos);
|
||||||
|
|
||||||
if (remaining == 0) {
|
//if (remaining == 0) {
|
||||||
break;
|
//break;
|
||||||
}
|
//}
|
||||||
}
|
//}
|
||||||
// same as if lastChunk break;
|
//// same as if lastChunk break;
|
||||||
} while (buffer_actual_size == buffer.size());
|
//} while (buffer_actual_size == buffer.size());
|
||||||
} else {
|
//} else {
|
||||||
assert(false && "implement me");
|
//assert(false && "implement me");
|
||||||
}
|
//}
|
||||||
|
|
||||||
//meta_file.flush();
|
//meta_file.flush();
|
||||||
//meta_file.close();
|
//meta_file.close();
|
||||||
while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
|
while (!meta_file_stack.empty()) { meta_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
|
||||||
data_file.flush();
|
//data_file.flush();
|
||||||
data_file.close();
|
//data_file.close();
|
||||||
|
while (!data_file_stack.empty()) { data_file_stack.pop(); } // destroy stack // TODO: maybe work with scope?
|
||||||
|
|
||||||
std::filesystem::rename(
|
std::filesystem::rename(
|
||||||
meta_tmp_path,
|
meta_tmp_path,
|
||||||
|
Loading…
Reference in New Issue
Block a user