From f44734707db375c6389c9cd36978fa3475fb078c Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 24 Jul 2024 04:40:11 +0000 Subject: [PATCH] refactor compress --- source/util/src/tcompression.c | 91 ++++++++++++++++++++-------------- source/util/src/tdecompress.c | 4 +- 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 884d7ea1b6..365585f55e 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -130,7 +130,7 @@ int32_t l2DecompressImpl_lz4(const char *const input, const int32_t compressedSi const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize); if (decompressed_size < 0) { uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size); - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } return decompressed_size; @@ -140,9 +140,9 @@ int32_t l2DecompressImpl_lz4(const char *const input, const int32_t compressedSi return compressedSize - 1; } else if (input[1] == 2) { uError("Invalid decompress string indicator:%d", input[0]); - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } int32_t l2ComressInitImpl_tsz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals, int32_t ifAdtFse, const char *compressor) { @@ -195,7 +195,7 @@ int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, ch memcpy(output + 1, input, inputSize); return inputSize + 1; } - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedSize, char *const output, int32_t outputSize, const char type) { @@ -205,7 +205,7 @@ int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedS if (ret == Z_OK) { return len; } else { - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } } else if (input[0] == 0) { @@ -214,7 +214,7 @@ int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedS return compressedSize - 1; } else if (input[1] == 2) { uError("Invalid decompress string indicator:%d", input[0]); - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } return 0; } @@ -243,7 +243,7 @@ int32_t l2DecompressImpl_zstd(const char *const input, const int32_t compressedS memcpy(output, input + 1, compressedSize - 1); return compressedSize - 1; } - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, @@ -269,7 +269,7 @@ int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSiz memcpy(output, input + 1, compressedSize - 1); return compressedSize - 1; } - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } #endif @@ -333,7 +333,7 @@ int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, tdszInit(fPrecision, dPrecision, maxIntervals, intervals, ifAdtFse, compressor); if (lossyFloat) uTrace("lossy compression float is opened. "); if (lossyDouble) uTrace("lossy compression double is opened. "); - return 1; + return 0; } // exit call void tsCompressExit() { tdszExit(); } @@ -559,7 +559,7 @@ int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char output[pos] |= t; } else { uError("Invalid compress bool value:%d", output[pos]); - return -1; + return TSDB_CODE_INVALID_PARA; } } @@ -600,7 +600,7 @@ int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, c } else if (type == TSDB_DATA_TYPE_DOUBLE) { return tsCompressDoubleImp(input, nelements, output); } - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } int32_t tsDecompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type) { if (type == TSDB_DATA_TYPE_FLOAT) { @@ -608,7 +608,7 @@ int32_t tsDecompressDoubleImp2(const char *const input, const int32_t nelements, } else if (type == TSDB_DATA_TYPE_DOUBLE) { return tsDecompressDoubleImp(input, nelements, output); } - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) { return tsCompressINTImp(input, nelements, output, type); @@ -696,7 +696,7 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize); if (decompressed_size < 0) { uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size); - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } return decompressed_size; @@ -706,9 +706,9 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c return compressedSize - 1; } else if (input[1] == 2) { uError("Invalid decompress string indicator:%d", input[0]); - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } /* --------------------------------------------Timestamp Compression ---------------------------------------------- */ @@ -2468,7 +2468,7 @@ int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_ return tsCompressStringImp(pBuf, len, pOut, nOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } } @@ -2487,7 +2487,7 @@ int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3 return tsDecompressFloatImp(pBuf, nEle, pOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } } @@ -2507,7 +2507,7 @@ int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 return tsCompressStringImp(pBuf, len, pOut, nOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } } @@ -2526,7 +2526,7 @@ int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int return tsDecompressDoubleImp(pBuf, nEle, pOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } } @@ -2550,25 +2550,26 @@ int32_t tsCompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t } else if (cmprAlg == TWO_STAGE_COMP) { int32_t len = tsCompressBoolImp(pIn, nEle, pBuf); if (len < 0) { - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } } int32_t tsDecompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf) { + int32_t code = 0; if (cmprAlg == ONE_STAGE_COMP) { return tsDecompressBoolImp(pIn, nEle, pOut); } else if (cmprAlg == TWO_STAGE_COMP) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressBoolImp(pBuf, nEle, pOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } @@ -2579,23 +2580,27 @@ int32_t tsCompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3 return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT); } else if (cmprAlg == TWO_STAGE_COMP) { int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_TINYINT); + if (len < 0) { + return len; + } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } int32_t tsDecompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf) { + int32_t code = 0; if (cmprAlg == ONE_STAGE_COMP) { return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT); } else if (cmprAlg == TWO_STAGE_COMP) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_TINYINT); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } @@ -2606,23 +2611,27 @@ int32_t tsCompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); } else if (cmprAlg == TWO_STAGE_COMP) { int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_SMALLINT); + if (len < 0) { + return 0; + } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } int32_t tsDecompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf) { + int32_t code = 0; if (cmprAlg == ONE_STAGE_COMP) { return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); } else if (cmprAlg == TWO_STAGE_COMP) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } @@ -2633,23 +2642,27 @@ int32_t tsCompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT); } else if (cmprAlg == TWO_STAGE_COMP) { int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_INT); + if (len < 0) { + return len; + } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } int32_t tsDecompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf) { + int32_t code = 0; if (cmprAlg == ONE_STAGE_COMP) { return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT); } else if (cmprAlg == TWO_STAGE_COMP) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_INT); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } @@ -2660,23 +2673,27 @@ int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); } else if (cmprAlg == TWO_STAGE_COMP) { int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT); + if (len < 0) { + return len; + } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf) { + int32_t code = 0; if (cmprAlg == ONE_STAGE_COMP) { return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); } else if (cmprAlg == TWO_STAGE_COMP) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT); } else { ASSERTS(0, "compress algo invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } } @@ -2713,14 +2730,14 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int int8_t alvl = tsGetCompressL2Level(l2, lvl); \ return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, alvl); \ } else { \ - uTrace("dencode:%s, decompress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \ + uTrace("dencode:%s, decompress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \ tDataTypes[type].name); \ return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \ } \ } else { \ ASSERT(0); \ } \ - return -1; \ + return TSDB_CODE_INVALID_PARA; \ } while (1) /************************************************************************* diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index aa1a8e3148..a27d2efbcc 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -34,7 +34,7 @@ int32_t getWordLength(char type) { break; default: uError("Invalid decompress integer type:%d", type); - return -1; + return TSDB_CODE_INVALID_PARA; } return wordLength; @@ -156,7 +156,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, // 13 6 9 4 5 2 1 // 0 D7,D6 D6 D5,D4 D4 D3,D2 D2 // D1,D0 D0 +D5,D4 D5,D4, 0 0 D1,D0 D1,D0 - //0 0 D7~D4 D6~D4 D5~D4 D4 D3~D0 D2~D0 + // 0 0 D7~D4 D6~D4 D5~D4 D4 D3~D0 D2~D0 // D1~D0 D0 22 15 9 4 6 3 // 1 0 //