add more compress alg

This commit is contained in:
yihaoDeng 2024-03-20 03:13:29 +00:00
parent ddb326b550
commit ba7342e50c
2 changed files with 361 additions and 233 deletions

View File

@ -19,6 +19,7 @@ target_include_directories(
PRIVATE "${TD_SOURCE_DIR}/include/common"
PRIVATE "${GRANT_CFG_INCLUDE_DIR}"
PRIVATE "${TD_SOURCE_DIR}/utils/TSZ/sz/inc"
PRIVATE "${TD_SOURCE_DIR}/utils/TSZ/zstd/"
)
target_link_libraries(
util

View File

@ -54,10 +54,188 @@
#include "tlog.h"
#include "tmsg.h"
#include "zlib.h"
#include "zstd.h"
#ifdef TD_TSZ
#include "td_sz.h"
#endif
int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0;
}
int32_t l2CompressImpl_disabled(const char *const input, const int32_t nelements, char *const output,
int32_t outputSize, const char type) {
return 0;
}
int32_t l2DecompressImpl_disabled(const char *const input, const int32_t nelements, char *const output,
int32_t outputSize, const char type) {
return 0;
}
int32_t l2ComressInitImpl_lz4(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0;
}
int32_t l2CompressImpl_lz4(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type) {
const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
// If cannot compress or after compression, data becomes larger.
if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
/* First byte is for indicator */
output[0] = 0;
memcpy(output + 1, input, inputSize);
return inputSize + 1;
}
output[0] = 1;
return compressed_data_size + 1;
}
int32_t l2DecompressImpl_lz4(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) {
if (input[0] == 1) {
/* It is compressed by LZ4 algorithm */
const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
if (decompressed_size < 0) {
uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
return -1;
}
return decompressed_size;
} else if (input[0] == 0) {
/* It is not compressed by LZ4 algorithm */
memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1;
} else if (input[1] == 2) {
uError("Invalid decompress string indicator:%d", input[0]);
return -1;
}
return -1;
}
int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0;
}
int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type) {
uLongf dstLen = outputSize - 1;
int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 0);
if (ret == Z_OK) {
output[0] = 1;
return dstLen + 1;
} else if (ret == Z_MEM_ERROR) {
output[0] = 0;
memcpy(output + 1, input, inputSize);
return inputSize + 1;
} else if (ret == Z_STREAM_ERROR) {
output[0] = 0;
memcpy(output + 1, input, inputSize);
return inputSize + 1;
}
return 0;
}
int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) {
if (input[0] == 1) {
uLongf len = 0;
int ret = uncompress((Bytef *)output, &len, (Bytef *)input + 1, compressedSize - 1);
if (ret == Z_OK) {
return len;
} else {
return -1;
}
} else if (input[0] == 0) {
/* It is not compressed by LZ4 algorithm */
memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1;
} else if (input[1] == 2) {
uError("Invalid decompress string indicator:%d", input[0]);
return -1;
}
return 0;
}
int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0;
}
int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type) {
size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, 0);
if (len > inputSize) {
output[0] = 0;
memcpy(output + 1, input, inputSize);
return inputSize + 1;
}
output[0] = 1;
return len + 1;
}
int32_t l2DecompressImpl_zstd(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) {
if (input[0] == 1) {
size_t len = ZSTD_decompress(output, outputSize, input + 1, compressedSize - 1);
return len;
} else if (input[0] == 0) {
memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1;
}
return -1;
}
int32_t l2ComressInitImpl_tsz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0;
}
int32_t l2CompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type) {
if (type == TSDB_DATA_TYPE_FLOAT) {
if (lossyFloat) {
return tsCompressFloatLossyImp(input, inputSize, output);
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
if (lossyDouble) {
return tsCompressDoubleLossyImp(input, inputSize, output);
}
}
return l2CompressImpl_lz4(input, inputSize, output, outputSize, type);
}
int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type) {
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
if (HEAD_ALGO(((uint8_t *)input)[0]) == ALGO_SZ_LOSSY) {
return tsDecompressFloatLossyImp(input, inputSize, outputSize, output);
}
}
return l2DecompressImpl_lz4(input, inputSize, output, outputSize, type);
}
int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0;
}
int32_t l2CompressImpl_xz(const char *const input, const int32_t nelements, char *const output, int32_t outputSize,
const char type) {
return 0;
}
int32_t l2DecompressImpl_xz(const char *const input, const int32_t nelements, char *const output, int32_t outputSize,
const char type) {
return 0;
}
TCompressL1FnSet compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}};
TCompressL2FnSet compressL2Dict[] = {{l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
{l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
{l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib},
{l2ComressInitImpl_zstd, l2CompressImpl_zstd, l2DecompressImpl_zstd},
{l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz},
{l2ComressInitImpl_xz, l2CompressImpl_xz, l2DecompressImpl_xz}};
static const int32_t TEST_NUMBER = 1;
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
@ -422,14 +600,91 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c
/* It is not compressed by LZ4 algorithm */
memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1;
} else {
} else if (input[1] == 2) {
uError("Invalid decompress string indicator:%d", input[0]);
return -1;
}
return -1;
}
// refactor later
int32_t tsCompressStringImp2(const char *const input, int32_t inputSize, char *const output, int32_t outputSize,
int8_t type) {
// Try to compress using LZ4 algorithm.
// if (type == L2_LZ4) {
// const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
// // If cannot compress or after compression, data becomes larger.
// if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
// /* First byte is for indicator */
// output[0] = 0;
// memcpy(output + 1, input, inputSize);
// return inputSize + 1;
// }
// output[0] = 1;
// return compressed_data_size + 1;
// } else if (type == L2_ZLIB) {
// uLongf dstLen = outputSize - 1;
// int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 0);
// if (ret == Z_OK) {
// output[0] = 1;
// return dstLen + 1;
// } else if (ret == Z_MEM_ERROR) {
// output[0] = 0;
// memcpy(output + 1, input, inputSize);
// return inputSize + 1;
// } else if (ret == Z_STREAM_ERROR) {
// output[0] = 0;
// memcpy(output + 1, input, inputSize);
// return inputSize + 1;
// }
// } else if (type == L2_ZSTD) {
// } else if (type == L2_TSZ) {
// } else if (type == L2_XZ) {
// } else if (type == L2_DISABLED) {
// ASSERT(0);
// }
return -1;
}
int32_t tsDecompressStringImp2(const char *const input, int32_t compressedSize, char *const output, int32_t outputSize,
int8_t type) {
// compressedSize is the size of data after compression.
l2CompressImpl_zstd(NULL, 0, 0, 0, 0);
return 0;
// if (input[0] == 1) {
// /* It is compressed by LZ4 algorithm */
// if (type == L2_LZ4) {
// const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
// if (decompressed_size < 0) {
// uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
// return -1;
// }
// return decompressed_size;
// } else if (type == L2_ZLIB) {
// } else if (type == L2_ZSTD) {
// } else if (type == L2_TSZ) {
// } else if (type == L2_XZ) {
// } else if (type == L2_DISABLED) {
// ASSERT(0);
// }
// } else if (input[0] == 0) {
// /* It is not compressed by LZ4 algorithm */
// memcpy(output, input + 1, compressedSize - 1);
// return compressedSize - 1;
// } else if (input[1] == 2) {
// uError("Invalid decompress string indicator:%d", input[0]);
// return -1;
// }
// return -1;
}
/* --------------------------------------------Timestamp Compression ---------------------------------------------- */
// TODO: Take care here, we assumes little endian encoding.
//
int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
int32_t _pos = 1;
int32_t longBytes = LONG_BYTES;
@ -2395,319 +2650,194 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
}
}
#define FUNC_COMPRESS_IMPL1(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
DEFINE_VAR(cmprAlg) \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
return l1Func(pIn, nEle, pOut); \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
int32_t len = l1Func(pIn, nEle, pBuf); \
if (compress) { \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
ASSERT(0); \
} else { \
ASSERT(0); \
} \
return -1; \
} while (1)
#define FUNC_COMPRESS_IMPL2(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
DEFINE_VAR(cmprAlg) \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
return l1Func(pIn, nEle, pOut, type); \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
int32_t len = l1Func(pIn, nEle, pBuf, type); \
if (compress) { \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
ASSERT(0); \
} else { \
ASSERT(0); \
} \
return -1; \
} while (1)
#define FUNC_COMPRESS_IMPL3(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
if (type != TSDB_DATA_TYPE_FLOAT && type != TSDB_DATA_RTYPE_DOUBLE { \
return -1; \
} \
DEFINE_VAR(cmprAlg) \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
return l1Func(pIn, nEle, pOut, type); \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
int32_t len = l1Func(pIn, nEle, pBuf, type); \
if (compress) { \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
ASSERT(0); \
} else { \
ASSERT(0); \
} \
return -1; \
} while (1)
// typedef int32_t (*__compress_fn)(const char *input, const int32_t nEle, char *const output);
/*************************************************************************
* REGULAR COMPRESSION 2
*************************************************************************/
// Timestamp =====================================================
int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsCompressTimestampImp(pIn, nEle, pOut);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
int32_t len = tsCompressTimestampImp(pIn, nEle, pBuf);
return tsCompressStringImp(pBuf, len, pOut, nOut);
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
ASSERT(0);
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
ASSERT(0);
}
return -1;
FUNC_COMPRESS_IMPL1(tsCompressTimestampImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP,
1);
}
int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsDecompressTimestampImp(pIn, nEle, pOut);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
int32_t len = tsDecompressStringImp(pIn, nIn, pBuf, nBuf);
if (len < 0) return -1;
return tsDecompressTimestampImp(pBuf, nEle, pOut);
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
ASSERT(0);
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
ASSERT(0);
}
return 0;
FUNC_COMPRESS_IMPL1(tsDecompressTimestampImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf,
TSDB_DATA_TYPE_TIMESTAMP, 0);
}
// Float =====================================================
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
return 0;
// #ifdef TD_TSZ
// // lossy mode
// if (lossyFloat) {
// return tsCompressFloatLossyImp(pIn, nEle, pOut);
// // lossless mode
// } else {
// #endif
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsCompressFloatImp(pIn, nEle, pOut);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// int32_t len = tsCompressFloatImp(pIn, nEle, pBuf);
// return tsCompressStringImp(pBuf, len, pOut, nOut);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
// #ifdef TD_TSZ
// }
// #endif
// FUNC_COMPRESS_IMPL1(tsCompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
if (lossyFloat) {
return tsCompressFloatLossyImp(pIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL1(tsCompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
}
int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
return 0;
// #ifdef TD_TSZ
// if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
// // decompress lossy
// return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
// } else {
// #endif
// // decompress lossless
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsDecompressFloatImp(pIn, nEle, pOut);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
// return tsDecompressFloatImp(pBuf, nEle, pOut);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
// #ifdef TD_TSZ
// }
// #endif
if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL1(tsDecompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 0);
}
// Double =====================================================
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
return 0;
// #ifdef TD_TSZ
// if (lossyDouble) {
// // lossy mode
// return tsCompressDoubleLossyImp(pIn, nEle, pOut);
// } else {
// #endif
// // lossless mode
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsCompressDoubleImp(pIn, nEle, pOut);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// int32_t len = tsCompressDoubleImp(pIn, nEle, pBuf);
// return tsCompressStringImp(pBuf, len, pOut, nOut);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
// #ifdef TD_TSZ
// }
// #endif
if (lossyDouble) {
// lossy mode
return tsCompressDoubleLossyImp(pIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL1(tsCompressDoubleImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 1);
}
int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
return 0;
// #ifdef TD_TSZ
// if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
// // decompress lossy
// return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
// } else {
// #endif
// // decompress lossless
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsDecompressDoubleImp(pIn, nEle, pOut);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
// return tsDecompressDoubleImp(pBuf, nEle, pOut);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
// #ifdef TD_TSZ
// }
// #endif
if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
// decompress lossy
return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL1(tsDecompressDoubleImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 0);
}
// Binary =====================================================
int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
return tsCompressStringImp(pIn, nIn, pOut, nOut);
DEFINE_VAR(cmprAlg)
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY);
}
int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
// return 0;
return tsDecompressStringImp(pIn, nIn, pOut, nOut);
DEFINE_VAR(cmprAlg)
return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY);
}
// Bool =====================================================
int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsCompressBoolImp(pIn, nEle, pOut);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
int32_t len = tsCompressBoolImp(pIn, nIn, pBuf);
if (len < 0) return -1;
return tsCompressStringImp(pBuf, len, pOut, nOut);
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
ASSERT(0);
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
ASSERT(0);
}
return -1;
FUNC_COMPRESS_IMPL1(tsCompressBoolImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 1);
}
int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsDecompressBoolImp(pIn, nEle, pOut);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
return tsDecompressBoolImp(pBuf, nEle, pOut);
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
return -1;
} else if (l2 == L1_DISABLED && l2 == L2_DISABLED) {
return -1;
}
return -1;
FUNC_COMPRESS_IMPL1(tsDecompressBoolImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 0);
}
// Tinyint =====================================================
int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
return 0;
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_TINYINT);
// return tsCompressStringImp(pBuf, len, pOut, nOut);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 1);
}
int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
return 0;
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
// return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 0);
}
// Smallint =====================================================
int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
return 0;
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_SMALLINT);
// return tsCompressStringImp(pBuf, len, pOut, nOut);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 1);
}
int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
return 0;
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
// return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 0);
}
// Int =====================================================
int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_INT);
return tsCompressStringImp(pBuf, len, pOut, nOut);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
} else if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
}
return -1;
FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 1);
}
int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_INT);
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
return -1;
}
return -1;
FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 0);
}
// Bigint =====================================================
int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT);
return tsCompressStringImp(pBuf, len, pOut, nOut);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
} else if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
}
return 0;
FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 1);
}
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
return -1;
}
return -1;
FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0);
}
int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);
int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
@ -2751,9 +2881,6 @@ int32_t tsDecompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void
return len;
}
TCompressL1FnSet compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}};
TCompressL2FnSet compressL2Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}};
int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn) {
int8_t l1 = COMPRESS_L1_TYPE_U8(compress);
int8_t l2 = COMPRESS_L2_TYPE_U8(compress);