diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 36cd84c332..a39df813a4 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -121,7 +121,7 @@ int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPre int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type) { uLongf dstLen = outputSize - 1; - int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 0); + int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 9); if (ret == Z_OK) { output[0] = 1; return dstLen + 1; @@ -140,7 +140,7 @@ int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, ch int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedSize, char *const output, int32_t outputSize, const char type) { if (input[0] == 1) { - uLongf len = 0; + uLongf len = outputSize; int ret = uncompress((Bytef *)output, &len, (Bytef *)input + 1, compressedSize - 1); if (ret == Z_OK) { return len; @@ -165,7 +165,7 @@ int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPre int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type) { - size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, 0); + size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, ZSTD_CLEVEL_DEFAULT); if (len > inputSize) { output[0] = 0; memcpy(output + 1, input, inputSize); @@ -178,8 +178,7 @@ int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, ch int32_t l2DecompressImpl_zstd(const char *const input, const int32_t compressedSize, char *const output, int32_t outputSize, const char type) { if (input[0] == 1) { - size_t len = ZSTD_decompress(output, outputSize, input + 1, compressedSize - 1); - return len; + return ZSTD_decompress(output, outputSize, input + 1, compressedSize - 1); } else if (input[0] == 0) { memcpy(output, input + 1, compressedSize - 1); return compressedSize - 1; @@ -2656,11 +2655,12 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ return l1Func(pIn, nEle, pOut); \ } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ - int32_t len = l1Func(pIn, nEle, pBuf); \ if (compress) { \ + int32_t len = l1Func(pIn, nEle, pBuf); \ return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \ } else { \ - return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \ + if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \ + return l1Func(pBuf, nEle, pOut); \ } \ } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ ASSERT(0); \ @@ -2676,11 +2676,12 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ return l1Func(pIn, nEle, pOut, type); \ } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ - int32_t len = l1Func(pIn, nEle, pBuf, type); \ if (compress) { \ + int32_t len = l1Func(pIn, nEle, pBuf, type); \ return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \ } else { \ - return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \ + if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \ + return l1Func(pBuf, nEle, pOut, type); \ } \ } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ ASSERT(0); \ @@ -2689,28 +2690,29 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } \ return -1; \ } while (1) -#define FUNC_COMPRESS_IMPL3(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ - do { \ - if (type != TSDB_DATA_TYPE_FLOAT && type != TSDB_DATA_RTYPE_DOUBLE { \ - return -1; \ - } \ - DEFINE_VAR(cmprAlg) \ - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ - return l1Func(pIn, nEle, pOut, type); \ - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ - int32_t len = l1Func(pIn, nEle, pBuf, type); \ - if (compress) { \ - return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \ - } else { \ - return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \ - } \ - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ - ASSERT(0); \ - } else { \ - ASSERT(0); \ - } \ - return -1; \ - } while (1) + +// #define FUNC_COMPRESS_IMPL3(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ +// do { \ +// if (type != TSDB_DATA_TYPE_FLOAT && type != TSDB_DATA_RTYPE_DOUBLE { \ +// return -1; \ +// } \ +// DEFINE_VAR(cmprAlg) \ +// if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ +// return l1Func(pIn, nEle, pOut, type); \ +// } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ +// int32_t len = l1Func(pIn, nEle, pBuf, type); \ +// if (compress) { \ +// return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \ +// } else { \ +// return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \ +// } \ +// } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ +// ASSERT(0); \ +// } else { \ +// ASSERT(0); \ +// } \ +// return -1; \ +// } while (1) // typedef int32_t (*__compress_fn)(const char *input, const int32_t nEle, char *const output); /************************************************************************* diff --git a/source/util/test/decompressTest.cpp b/source/util/test/decompressTest.cpp index 1a614be36e..359432bef5 100644 --- a/source/util/test/decompressTest.cpp +++ b/source/util/test/decompressTest.cpp @@ -111,34 +111,64 @@ void setColLevel(uint32_t* compress, uint8_t level) { *compress |= level; return; } -TEST(utilTest, zstdtest) { - int32_t num = 10000; +void compressImplTest(void* pVal, int8_t type, int32_t sz, uint32_t cmprAlg) { + { + int64_t* pList = (int64_t*)pVal; + int32_t num = sz; + + char* px = static_cast(taosMemoryMalloc(num * sizeof(int64_t))); + char* pBuf = static_cast(taosMemoryMalloc(num * sizeof(int64_t) + 64)); + + int32_t len = + tsCompressTimestamp2(pList, num * sizeof(int64_t), num, px, num, cmprAlg, pBuf, num * sizeof(int64_t) + 64); + + char* pOutput = static_cast(taosMemoryCalloc(1, num * sizeof(int64_t))); + memset(pBuf, 0, num * sizeof(int64_t) + 64); + int32_t size = + tsDecompressTimestamp2(px, len, num, pOutput, sizeof(int64_t) * num, cmprAlg, pBuf, num * sizeof(int64_t) + 64); + printf("size: %d\n", size); + for (int i = 0; i < num; i++) { + int64_t val = *(int64_t*)(pOutput + i * sizeof(int64_t)); + int32_t ival = val; + if (i < 100) printf("val = %d\n", ival); + } + taosMemoryFree(px); + taosMemoryFree(pBuf); + taosMemoryFree(pOutput); + } +} +TEST(utilTest, zstdtest) { + int32_t num = 10000; int64_t* pList = static_cast(taosMemoryCalloc(num, sizeof(int64_t))); - int64_t iniVal = 1700000000; + int64_t iniVal = 17000; uint32_t v = 100; for (int32_t i = 0; i < num; ++i) { - iniVal += num; + iniVal += i; pList[i] = iniVal; } - uint32_t cmprAlg = 0; - setColCompress(&cmprAlg, 1); - setColEncode(&cmprAlg, 1); + { + uint32_t cmprAlg = 0; + setColCompress(&cmprAlg, 1); + setColEncode(&cmprAlg, 1); - char* pOutput = static_cast(taosMemoryMalloc(num * sizeof(int64_t))); + compressImplTest((void*)pList, 0, num, cmprAlg); + } - int32_t bufSize = num * sizeof(int64_t) + 64; - char* pBuf = static_cast(taosMemoryMalloc(bufSize)); + { + uint32_t cmprAlg = 0; + setColCompress(&cmprAlg, 2); + setColEncode(&cmprAlg, 1); - int32_t sz = - tsCompressTimestamp2(pList, num * sizeof(int64_t), num, pOutput, num * sizeof(int64_t), cmprAlg, pBuf, bufSize); - printf("compress size: %d", sz); + compressImplTest((void*)pList, 0, num, cmprAlg); + } + { + uint32_t cmprAlg = 0; + setColCompress(&cmprAlg, 3); + setColEncode(&cmprAlg, 1); - tsDecompressTimestamp(pOutput, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf) - - - - + compressImplTest((void*)pList, 0, num, cmprAlg); + } }