From 66d9447733d2e14873360af7174f989c5e64172a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 24 Mar 2024 08:15:01 +0000 Subject: [PATCH] refactor code --- include/util/tcompression.h | 2 +- source/common/src/tdataformat.c | 7 ++- source/util/src/tcompression.c | 77 ++++++++++++++++++--------------- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/include/util/tcompression.h b/include/util/tcompression.h index d83fefcce1..c77862066e 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -227,7 +227,7 @@ typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, const int3 const char type); typedef int32_t (*__data_compress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output, - int32_t outputSize, const char type); + int32_t outputSize, const char type, int8_t level); typedef int32_t (*__data_decompress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output, int32_t outputSize, const char type); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index af4d404aa0..4689daa347 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -4231,7 +4231,8 @@ int32_t tCompressData(void *input, // input extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? info->originalSize : info->originalSize + COMP_OVERFLOW_BYTES; ASSERT(outputSize >= extraSizeNeeded); - if (info->cmprAlg == NO_COMPRESSION) { + DEFINE_VAR(info->cmprAlg) + if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_DISABLED && l2 == L2_DISABLED)) { memcpy(output, input, info->originalSize); info->compressedSize = info->originalSize; } else if (info->cmprAlg == TWO_STAGE_COMP) { @@ -4307,7 +4308,9 @@ int32_t tDecompressData(void *input, // input ASSERT(outputSize >= info->originalSize); - if (info->cmprAlg == NO_COMPRESSION) { + DEFINE_VAR(info->cmprAlg); + + if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_DISABLED && l2 == L2_DISABLED)) { ASSERT(info->compressedSize == info->originalSize); memcpy(output, input, info->compressedSize); } else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) { diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index cf5da07ccd..8b7e342673 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -90,7 +90,7 @@ int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double } int32_t l2CompressImpl_disabled(const char *const input, const int32_t nelements, char *const output, - int32_t outputSize, const char type) { + int32_t outputSize, const char type, int8_t lvl) { return 0; } int32_t l2DecompressImpl_disabled(const char *const input, const int32_t nelements, char *const output, @@ -103,7 +103,7 @@ int32_t l2ComressInitImpl_lz4(char *lossyColumns, float fPrecision, double dPrec } int32_t l2CompressImpl_lz4(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, - const char type) { + const char type, int8_t lvl) { const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1); // If cannot compress or after compression, data becomes larger. @@ -142,7 +142,7 @@ int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPre return 0; } int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, - const char type) { + const char type, int8_t lvl) { uLongf dstLen = outputSize - 1; int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 9); if (ret == Z_OK) { @@ -182,7 +182,7 @@ int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPre } int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, - const char type) { + const char type, int8_t lvl) { size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, ZSTD_CLEVEL_DEFAULT); if (len > inputSize) { output[0] = 0; @@ -208,7 +208,7 @@ int32_t l2ComressInitImpl_tsz(char *lossyColumns, float fPrecision, double dPrec return 0; } int32_t l2CompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, - const char type) { + const char type, int8_t lvl) { if (type == TSDB_DATA_TYPE_FLOAT) { if (lossyFloat) { return tsCompressFloatLossyImp(input, inputSize, output); @@ -219,7 +219,7 @@ int32_t l2CompressImpl_tsz(const char *const input, const int32_t inputSize, cha } } - return l2CompressImpl_lz4(input, inputSize, output, outputSize, type); + return l2CompressImpl_lz4(input, inputSize, output, outputSize, type, lvl); } int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, @@ -238,7 +238,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci return 0; } int32_t l2CompressImpl_xz(const char *const input, const int32_t nelements, char *const output, int32_t outputSize, - const char type) { + const char type, int8_t lvl) { return 0; } int32_t l2DecompressImpl_xz(const char *const input, const int32_t nelements, char *const output, int32_t outputSize, @@ -2635,33 +2635,38 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } } -#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ - do { \ - DEFINE_VAR(cmprAlg) \ - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ - if (compress) { \ - return compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ - } else { \ - return compressL1Dict[l1].decomprFn(pIn, nEle, pBuf, type); \ - } \ - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ - if (compress) { \ - int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ - return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \ - } else { \ - if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \ - return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \ - } \ - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ - if (compress) { \ - return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type); \ - } else { \ - return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \ - } \ - } else { \ - ASSERT(0); \ - } \ - return -1; \ +#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ + do { \ + DEFINE_VAR(cmprAlg) \ + if (compress) { \ + uTrace("encode:%s, compress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \ + } else { \ + uTrace("decode:%s, decompress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \ + } \ + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ + if (compress) { \ + return compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ + } else { \ + return compressL1Dict[l1].decomprFn(pIn, nEle, pBuf, type); \ + } \ + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ + if (compress) { \ + int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ + return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, lvl); \ + } else { \ + if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \ + return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \ + } \ + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ + if (compress) { \ + return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, lvl); \ + } else { \ + return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \ + } \ + } else { \ + ASSERT(0); \ + } \ + return -1; \ } while (1) /************************************************************************* @@ -2718,7 +2723,7 @@ int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, in int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { DEFINE_VAR(cmprAlg) - return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY); + return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY, lvl); } int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, @@ -2801,7 +2806,7 @@ int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void * len = fn1.comprFn(pIn, nEle, pOut, type); } else { len = fn1.comprFn(pIn, nEle, pBuf, type); - len = fn2.comprFn(pBuf, len, pOut, nOut, type); + len = fn2.comprFn(pBuf, len, pOut, nOut, type, lvl); } return len; }