From ba7342e50ceb99244ddc95fe8f3f9c58be38cb01 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 20 Mar 2024 03:13:29 +0000 Subject: [PATCH] add more compress alg --- source/util/CMakeLists.txt | 1 + source/util/src/tcompression.c | 593 ++++++++++++++++++++------------- 2 files changed, 361 insertions(+), 233 deletions(-) diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 3008a347ad..83fe30b906 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -19,6 +19,7 @@ target_include_directories( PRIVATE "${TD_SOURCE_DIR}/include/common" PRIVATE "${GRANT_CFG_INCLUDE_DIR}" PRIVATE "${TD_SOURCE_DIR}/utils/TSZ/sz/inc" + PRIVATE "${TD_SOURCE_DIR}/utils/TSZ/zstd/" ) target_link_libraries( util diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index ae4c3f4be1..20bfc56c42 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -54,10 +54,188 @@ #include "tlog.h" #include "tmsg.h" +#include "zlib.h" +#include "zstd.h" + #ifdef TD_TSZ #include "td_sz.h" #endif +int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor) { + return 0; +} + +int32_t l2CompressImpl_disabled(const char *const input, const int32_t nelements, char *const output, + int32_t outputSize, const char type) { + return 0; +} +int32_t l2DecompressImpl_disabled(const char *const input, const int32_t nelements, char *const output, + int32_t outputSize, const char type) { + return 0; +} +int32_t l2ComressInitImpl_lz4(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor) { + return 0; +} + +int32_t l2CompressImpl_lz4(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, + const char type) { + const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1); + + // If cannot compress or after compression, data becomes larger. + if (compressed_data_size <= 0 || compressed_data_size > inputSize) { + /* First byte is for indicator */ + output[0] = 0; + memcpy(output + 1, input, inputSize); + return inputSize + 1; + } + output[0] = 1; + return compressed_data_size + 1; +} +int32_t l2DecompressImpl_lz4(const char *const input, const int32_t compressedSize, char *const output, + int32_t outputSize, const char type) { + if (input[0] == 1) { + /* It is compressed by LZ4 algorithm */ + const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize); + if (decompressed_size < 0) { + uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size); + return -1; + } + + return decompressed_size; + } else if (input[0] == 0) { + /* It is not compressed by LZ4 algorithm */ + memcpy(output, input + 1, compressedSize - 1); + return compressedSize - 1; + } else if (input[1] == 2) { + uError("Invalid decompress string indicator:%d", input[0]); + return -1; + } + return -1; +} +int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor) { + return 0; +} +int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, + const char type) { + uLongf dstLen = outputSize - 1; + int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 0); + if (ret == Z_OK) { + output[0] = 1; + return dstLen + 1; + } else if (ret == Z_MEM_ERROR) { + output[0] = 0; + memcpy(output + 1, input, inputSize); + return inputSize + 1; + + } else if (ret == Z_STREAM_ERROR) { + output[0] = 0; + memcpy(output + 1, input, inputSize); + return inputSize + 1; + } + return 0; +} +int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedSize, char *const output, + int32_t outputSize, const char type) { + if (input[0] == 1) { + uLongf len = 0; + int ret = uncompress((Bytef *)output, &len, (Bytef *)input + 1, compressedSize - 1); + if (ret == Z_OK) { + return len; + } else { + return -1; + } + + } else if (input[0] == 0) { + /* It is not compressed by LZ4 algorithm */ + memcpy(output, input + 1, compressedSize - 1); + return compressedSize - 1; + } else if (input[1] == 2) { + uError("Invalid decompress string indicator:%d", input[0]); + return -1; + } + return 0; +} +int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor) { + return 0; +} + +int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, + const char type) { + size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, 0); + if (len > inputSize) { + output[0] = 0; + memcpy(output + 1, input, inputSize); + return inputSize + 1; + } + output[0] = 1; + + return len + 1; +} +int32_t l2DecompressImpl_zstd(const char *const input, const int32_t compressedSize, char *const output, + int32_t outputSize, const char type) { + if (input[0] == 1) { + size_t len = ZSTD_decompress(output, outputSize, input + 1, compressedSize - 1); + return len; + } else if (input[0] == 0) { + memcpy(output, input + 1, compressedSize - 1); + return compressedSize - 1; + } + return -1; +} +int32_t l2ComressInitImpl_tsz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor) { + return 0; +} +int32_t l2CompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, + const char type) { + if (type == TSDB_DATA_TYPE_FLOAT) { + if (lossyFloat) { + return tsCompressFloatLossyImp(input, inputSize, output); + } + } else if (type == TSDB_DATA_TYPE_DOUBLE) { + if (lossyDouble) { + return tsCompressDoubleLossyImp(input, inputSize, output); + } + } + + return l2CompressImpl_lz4(input, inputSize, output, outputSize, type); +} +int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, + const char type) { + if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + if (HEAD_ALGO(((uint8_t *)input)[0]) == ALGO_SZ_LOSSY) { + return tsDecompressFloatLossyImp(input, inputSize, outputSize, output); + } + } + + return l2DecompressImpl_lz4(input, inputSize, output, outputSize, type); +} + +int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, + uint32_t intervals, int32_t ifAdtFse, const char *compressor) { + return 0; +} +int32_t l2CompressImpl_xz(const char *const input, const int32_t nelements, char *const output, int32_t outputSize, + const char type) { + return 0; +} +int32_t l2DecompressImpl_xz(const char *const input, const int32_t nelements, char *const output, int32_t outputSize, + const char type) { + return 0; +} + +TCompressL1FnSet compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}}; +TCompressL2FnSet compressL2Dict[] = {{l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled}, + {l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}, + {l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib}, + {l2ComressInitImpl_zstd, l2CompressImpl_zstd, l2DecompressImpl_zstd}, + {l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz}, + {l2ComressInitImpl_xz, l2CompressImpl_xz, l2DecompressImpl_xz}}; + static const int32_t TEST_NUMBER = 1; #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL) @@ -422,14 +600,91 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c /* It is not compressed by LZ4 algorithm */ memcpy(output, input + 1, compressedSize - 1); return compressedSize - 1; - } else { + } else if (input[1] == 2) { uError("Invalid decompress string indicator:%d", input[0]); return -1; } + return -1; +} +// refactor later +int32_t tsCompressStringImp2(const char *const input, int32_t inputSize, char *const output, int32_t outputSize, + int8_t type) { + // Try to compress using LZ4 algorithm. + + // if (type == L2_LZ4) { + // const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1); + + // // If cannot compress or after compression, data becomes larger. + // if (compressed_data_size <= 0 || compressed_data_size > inputSize) { + // /* First byte is for indicator */ + // output[0] = 0; + // memcpy(output + 1, input, inputSize); + // return inputSize + 1; + // } + + // output[0] = 1; + // return compressed_data_size + 1; + // } else if (type == L2_ZLIB) { + // uLongf dstLen = outputSize - 1; + // int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 0); + // if (ret == Z_OK) { + // output[0] = 1; + // return dstLen + 1; + // } else if (ret == Z_MEM_ERROR) { + // output[0] = 0; + // memcpy(output + 1, input, inputSize); + // return inputSize + 1; + + // } else if (ret == Z_STREAM_ERROR) { + // output[0] = 0; + // memcpy(output + 1, input, inputSize); + // return inputSize + 1; + // } + // } else if (type == L2_ZSTD) { + // } else if (type == L2_TSZ) { + // } else if (type == L2_XZ) { + // } else if (type == L2_DISABLED) { + // ASSERT(0); + // } + return -1; +} + +int32_t tsDecompressStringImp2(const char *const input, int32_t compressedSize, char *const output, int32_t outputSize, + int8_t type) { + // compressedSize is the size of data after compression. + l2CompressImpl_zstd(NULL, 0, 0, 0, 0); + return 0; + + // if (input[0] == 1) { + // /* It is compressed by LZ4 algorithm */ + // if (type == L2_LZ4) { + // const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize); + // if (decompressed_size < 0) { + // uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size); + // return -1; + // } + // return decompressed_size; + // } else if (type == L2_ZLIB) { + // } else if (type == L2_ZSTD) { + // } else if (type == L2_TSZ) { + // } else if (type == L2_XZ) { + // } else if (type == L2_DISABLED) { + // ASSERT(0); + // } + // } else if (input[0] == 0) { + // /* It is not compressed by LZ4 algorithm */ + // memcpy(output, input + 1, compressedSize - 1); + // return compressedSize - 1; + // } else if (input[1] == 2) { + // uError("Invalid decompress string indicator:%d", input[0]); + // return -1; + // } + // return -1; } /* --------------------------------------------Timestamp Compression ---------------------------------------------- */ // TODO: Take care here, we assumes little endian encoding. +// int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) { int32_t _pos = 1; int32_t longBytes = LONG_BYTES; @@ -2395,319 +2650,194 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } } +#define FUNC_COMPRESS_IMPL1(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ + do { \ + DEFINE_VAR(cmprAlg) \ + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ + return l1Func(pIn, nEle, pOut); \ + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ + int32_t len = l1Func(pIn, nEle, pBuf); \ + if (compress) { \ + return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \ + } else { \ + return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \ + } \ + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ + ASSERT(0); \ + } else { \ + ASSERT(0); \ + } \ + return -1; \ + } while (1) + +#define FUNC_COMPRESS_IMPL2(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ + do { \ + DEFINE_VAR(cmprAlg) \ + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ + return l1Func(pIn, nEle, pOut, type); \ + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ + int32_t len = l1Func(pIn, nEle, pBuf, type); \ + if (compress) { \ + return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \ + } else { \ + return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \ + } \ + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ + ASSERT(0); \ + } else { \ + ASSERT(0); \ + } \ + return -1; \ + } while (1) +#define FUNC_COMPRESS_IMPL3(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ + do { \ + if (type != TSDB_DATA_TYPE_FLOAT && type != TSDB_DATA_RTYPE_DOUBLE { \ + return -1; \ + } \ + DEFINE_VAR(cmprAlg) \ + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ + return l1Func(pIn, nEle, pOut, type); \ + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ + int32_t len = l1Func(pIn, nEle, pBuf, type); \ + if (compress) { \ + return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \ + } else { \ + return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \ + } \ + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ + ASSERT(0); \ + } else { \ + ASSERT(0); \ + } \ + return -1; \ + } while (1) + +// typedef int32_t (*__compress_fn)(const char *input, const int32_t nEle, char *const output); /************************************************************************* * REGULAR COMPRESSION 2 *************************************************************************/ // Timestamp ===================================================== int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - DEFINE_VAR(cmprAlg) - - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - return tsCompressTimestampImp(pIn, nEle, pOut); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - int32_t len = tsCompressTimestampImp(pIn, nEle, pBuf); - return tsCompressStringImp(pBuf, len, pOut, nOut); - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { - ASSERT(0); - } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { - ASSERT(0); - } - return -1; + FUNC_COMPRESS_IMPL1(tsCompressTimestampImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP, + 1); } int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - DEFINE_VAR(cmprAlg) - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - return tsDecompressTimestampImp(pIn, nEle, pOut); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - int32_t len = tsDecompressStringImp(pIn, nIn, pBuf, nBuf); - if (len < 0) return -1; - return tsDecompressTimestampImp(pBuf, nEle, pOut); - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { - ASSERT(0); - } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { - ASSERT(0); - } - - return 0; + FUNC_COMPRESS_IMPL1(tsDecompressTimestampImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, + TSDB_DATA_TYPE_TIMESTAMP, 0); } // Float ===================================================== int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // #ifdef TD_TSZ - // // lossy mode - // if (lossyFloat) { - // return tsCompressFloatLossyImp(pIn, nEle, pOut); - // // lossless mode - // } else { - // #endif - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsCompressFloatImp(pIn, nEle, pOut); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // int32_t len = tsCompressFloatImp(pIn, nEle, pBuf); - // return tsCompressStringImp(pBuf, len, pOut, nOut); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } - // #ifdef TD_TSZ - // } - // #endif + // if (lossyFloat) { + // return tsCompressFloatLossyImp(pIn, nEle, pOut); + // } else { + // FUNC_COMPRESS_IMPL1(tsCompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1); + if (lossyFloat) { + return tsCompressFloatLossyImp(pIn, nEle, pOut); + } + FUNC_COMPRESS_IMPL1(tsCompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1); } int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // #ifdef TD_TSZ - // if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) { - // // decompress lossy - // return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut); - // } else { - // #endif - // // decompress lossless - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsDecompressFloatImp(pIn, nEle, pOut); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - // return tsDecompressFloatImp(pBuf, nEle, pOut); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } - // #ifdef TD_TSZ - // } - // #endif + if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) { + return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut); + } + FUNC_COMPRESS_IMPL1(tsDecompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 0); } // Double ===================================================== int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // #ifdef TD_TSZ - // if (lossyDouble) { - // // lossy mode - // return tsCompressDoubleLossyImp(pIn, nEle, pOut); - // } else { - // #endif - // // lossless mode - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsCompressDoubleImp(pIn, nEle, pOut); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // int32_t len = tsCompressDoubleImp(pIn, nEle, pBuf); - // return tsCompressStringImp(pBuf, len, pOut, nOut); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } - // #ifdef TD_TSZ - // } - // #endif + if (lossyDouble) { + // lossy mode + return tsCompressDoubleLossyImp(pIn, nEle, pOut); + } + FUNC_COMPRESS_IMPL1(tsCompressDoubleImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 1); } int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // #ifdef TD_TSZ - // if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) { - // // decompress lossy - // return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut); - // } else { - // #endif - // // decompress lossless - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsDecompressDoubleImp(pIn, nEle, pOut); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - // return tsDecompressDoubleImp(pBuf, nEle, pOut); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } - // #ifdef TD_TSZ - // } - // #endif + if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) { + // decompress lossy + return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut); + } + FUNC_COMPRESS_IMPL1(tsDecompressDoubleImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 0); } // Binary ===================================================== int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return tsCompressStringImp(pIn, nIn, pOut, nOut); + DEFINE_VAR(cmprAlg) + return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY); } int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { // return 0; - return tsDecompressStringImp(pIn, nIn, pOut, nOut); + DEFINE_VAR(cmprAlg) + return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY); } // Bool ===================================================== int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - DEFINE_VAR(cmprAlg) - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - return tsCompressBoolImp(pIn, nEle, pOut); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - int32_t len = tsCompressBoolImp(pIn, nIn, pBuf); - if (len < 0) return -1; - return tsCompressStringImp(pBuf, len, pOut, nOut); - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { - ASSERT(0); - } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { - ASSERT(0); - } - return -1; + FUNC_COMPRESS_IMPL1(tsCompressBoolImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 1); } int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - DEFINE_VAR(cmprAlg) - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - return tsDecompressBoolImp(pIn, nEle, pOut); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - return tsDecompressBoolImp(pBuf, nEle, pOut); - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { - return -1; - } else if (l2 == L1_DISABLED && l2 == L2_DISABLED) { - return -1; - } - return -1; + FUNC_COMPRESS_IMPL1(tsDecompressBoolImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 0); } // Tinyint ===================================================== int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_TINYINT); - // return tsCompressStringImp(pBuf, len, pOut, nOut); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } + FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 1); } int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - // return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_TINYINT); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } + FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 0); } // Smallint ===================================================== int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_SMALLINT); - // return tsCompressStringImp(pBuf, len, pOut, nOut); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } + FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 1); } int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - // return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } + FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 0); } // Int ===================================================== int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - DEFINE_VAR(cmprAlg) - - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_INT); - return tsCompressStringImp(pBuf, len, pOut, nOut); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - ASSERTS(0, "compress algo invalid"); - return -1; - } else if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - ASSERTS(0, "compress algo invalid"); - return -1; - } - return -1; + FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 1); } int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - DEFINE_VAR(cmprAlg) - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_INT); - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { - ASSERTS(0, "compress algo invalid"); - return -1; - } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { - return -1; - } - return -1; + FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 0); } // Bigint ===================================================== int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - DEFINE_VAR(cmprAlg) - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT); - return tsCompressStringImp(pBuf, len, pOut, nOut); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - ASSERTS(0, "compress algo invalid"); - return -1; - } else if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - ASSERTS(0, "compress algo invalid"); - return -1; - } - return 0; + FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 1); } int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - DEFINE_VAR(cmprAlg) - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { - return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT); - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { - ASSERTS(0, "compress algo invalid"); - return -1; - } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { - return -1; - } - return -1; + FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0); } + int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn); int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, @@ -2751,9 +2881,6 @@ int32_t tsDecompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void return len; } -TCompressL1FnSet compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}}; -TCompressL2FnSet compressL2Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}}; - int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn) { int8_t l1 = COMPRESS_L1_TYPE_U8(compress); int8_t l2 = COMPRESS_L2_TYPE_U8(compress);