This commit is contained in:
yihaoDeng 2024-03-22 10:46:50 +00:00
parent 455d9e51fd
commit 6147860edb
3 changed files with 277 additions and 182 deletions

View File

@ -61,6 +61,29 @@
#include "td_sz.h"
#endif
// delta
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressTimestampImp2(const char *const input, const int32_t nelements, char *const output,
const char type);
// simple8b
int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type);
// bit
int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type);
int32_t tsDecompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type);
// double specail
int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type);
int32_t tsDecompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type);
int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output);
int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0;
@ -125,17 +148,12 @@ int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, ch
if (ret == Z_OK) {
output[0] = 1;
return dstLen + 1;
} else if (ret == Z_MEM_ERROR) {
output[0] = 0;
memcpy(output + 1, input, inputSize);
return inputSize + 1;
} else if (ret == Z_STREAM_ERROR) {
} else {
output[0] = 0;
memcpy(output + 1, input, inputSize);
return inputSize + 1;
}
return 0;
return -1;
}
int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) {
@ -203,6 +221,7 @@ int32_t l2CompressImpl_tsz(const char *const input, const int32_t inputSize, cha
return l2CompressImpl_lz4(input, inputSize, output, outputSize, type);
}
int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type) {
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
@ -227,7 +246,12 @@ int32_t l2DecompressImpl_xz(const char *const input, const int32_t nelements, ch
return 0;
}
TCompressL1FnSet compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}};
TCompressL1FnSet compressL1Dict[] = {{NULL, NULL, NULL},
{NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2},
{NULL, tsCompressINTImp2, tsDecompressINTImp2},
{NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2},
{NULL, tsCompressBoolImp2, tsDecompressBoolImp2},
{NULL, NULL, NULL}};
TCompressL2FnSet compressL2Dict[] = {{l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
{l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
{l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib},
@ -511,6 +535,35 @@ int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, ch
return nelements;
}
int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
return tsCompressBoolImp(input, nelements, output);
}
int32_t tsDecompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
return tsDecompressBoolImp(input, nelements, output);
}
int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
if (type == TSDB_DATA_TYPE_FLOAT) {
return tsCompressFloatImp(input, nelements, output);
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
return tsCompressDoubleImp(input, nelements, output);
}
return -1;
}
int32_t tsDecompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
if (type == TSDB_DATA_TYPE_FLOAT) {
return tsDecompressFloatImp(input, nelements, output);
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
return tsDecompressDoubleImp(input, nelements, output);
}
return -1;
}
int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
return tsCompressINTImp(input, nelements, output, type);
}
int32_t tsDecompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
return tsDecompressINTImp(input, nelements, output, type);
}
#if 0
/* Run Length Encoding(RLE) Method */
@ -605,81 +658,6 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c
}
return -1;
}
// refactor later
int32_t tsCompressStringImp2(const char *const input, int32_t inputSize, char *const output, int32_t outputSize,
int8_t type) {
// Try to compress using LZ4 algorithm.
// if (type == L2_LZ4) {
// const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
// // If cannot compress or after compression, data becomes larger.
// if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
// /* First byte is for indicator */
// output[0] = 0;
// memcpy(output + 1, input, inputSize);
// return inputSize + 1;
// }
// output[0] = 1;
// return compressed_data_size + 1;
// } else if (type == L2_ZLIB) {
// uLongf dstLen = outputSize - 1;
// int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 0);
// if (ret == Z_OK) {
// output[0] = 1;
// return dstLen + 1;
// } else if (ret == Z_MEM_ERROR) {
// output[0] = 0;
// memcpy(output + 1, input, inputSize);
// return inputSize + 1;
// } else if (ret == Z_STREAM_ERROR) {
// output[0] = 0;
// memcpy(output + 1, input, inputSize);
// return inputSize + 1;
// }
// } else if (type == L2_ZSTD) {
// } else if (type == L2_TSZ) {
// } else if (type == L2_XZ) {
// } else if (type == L2_DISABLED) {
// ASSERT(0);
// }
return -1;
}
int32_t tsDecompressStringImp2(const char *const input, int32_t compressedSize, char *const output, int32_t outputSize,
int8_t type) {
// compressedSize is the size of data after compression.
l2CompressImpl_zstd(NULL, 0, 0, 0, 0);
return 0;
// if (input[0] == 1) {
// /* It is compressed by LZ4 algorithm */
// if (type == L2_LZ4) {
// const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
// if (decompressed_size < 0) {
// uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
// return -1;
// }
// return decompressed_size;
// } else if (type == L2_ZLIB) {
// } else if (type == L2_ZSTD) {
// } else if (type == L2_TSZ) {
// } else if (type == L2_XZ) {
// } else if (type == L2_DISABLED) {
// ASSERT(0);
// }
// } else if (input[0] == 0) {
// /* It is not compressed by LZ4 algorithm */
// memcpy(output, input + 1, compressedSize - 1);
// return compressedSize - 1;
// } else if (input[1] == 2) {
// uError("Invalid decompress string indicator:%d", input[0]);
// return -1;
// }
// return -1;
}
/* --------------------------------------------Timestamp Compression ---------------------------------------------- */
// TODO: Take care here, we assumes little endian encoding.
@ -858,6 +836,13 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
return nelements * longBytes;
}
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
return tsCompressTimestampImp(input, nelements, output);
}
int32_t tsDecompressTimestampImp2(const char *const input, const int32_t nelements, char *const output,
const char type) {
return tsDecompressTimestampImp(input, nelements, output);
}
/* --------------------------------------------Double Compression ---------------------------------------------- */
void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) {
@ -2649,99 +2634,56 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
}
}
#define FUNC_COMPRESS_IMPL1(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
DEFINE_VAR(cmprAlg) \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
return l1Func(pIn, nEle, pOut); \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
int32_t len = l1Func(pIn, nEle, pBuf); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \
return l1Func(pBuf, nEle, pOut); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
ASSERT(0); \
} else { \
ASSERT(0); \
} \
return -1; \
#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
DEFINE_VAR(cmprAlg) \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
if (compress) { \
return compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
} else { \
return compressL1Dict[l1].decomprFn(pIn, nEle, pBuf, type); \
} \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \
return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type); \
} else { \
return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \
} \
} else { \
ASSERT(0); \
} \
return -1; \
} while (1)
#define FUNC_COMPRESS_IMPL2(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
DEFINE_VAR(cmprAlg) \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
return l1Func(pIn, nEle, pOut, type); \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
int32_t len = l1Func(pIn, nEle, pBuf, type); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \
return l1Func(pBuf, nEle, pOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
ASSERT(0); \
} else { \
ASSERT(0); \
} \
return -1; \
} while (1)
// #define FUNC_COMPRESS_IMPL3(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
// do { \
// if (type != TSDB_DATA_TYPE_FLOAT && type != TSDB_DATA_RTYPE_DOUBLE { \
// return -1; \
// } \
// DEFINE_VAR(cmprAlg) \
// if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
// return l1Func(pIn, nEle, pOut, type); \
// } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
// int32_t len = l1Func(pIn, nEle, pBuf, type); \
// if (compress) { \
// return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
// } else { \
// return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \
// } \
// } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
// ASSERT(0); \
// } else { \
// ASSERT(0); \
// } \
// return -1; \
// } while (1)
// typedef int32_t (*__compress_fn)(const char *input, const int32_t nEle, char *const output);
/*************************************************************************
* REGULAR COMPRESSION 2
*************************************************************************/
// Timestamp =====================================================
int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
FUNC_COMPRESS_IMPL1(tsCompressTimestampImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP,
1);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP, 1);
}
int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
FUNC_COMPRESS_IMPL1(tsDecompressTimestampImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf,
TSDB_DATA_TYPE_TIMESTAMP, 0);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP, 0);
}
// Float =====================================================
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
// if (lossyFloat) {
// return tsCompressFloatLossyImp(pIn, nEle, pOut);
// } else {
// FUNC_COMPRESS_IMPL1(tsCompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
if (lossyFloat) {
return tsCompressFloatLossyImp(pIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL1(tsCompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
}
int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
@ -2749,7 +2691,7 @@ int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL1(tsDecompressFloatImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 0);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 0);
}
// Double =====================================================
@ -2759,7 +2701,7 @@ int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3
// lossy mode
return tsCompressDoubleLossyImp(pIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL1(tsCompressDoubleImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 1);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 1);
}
int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
@ -2768,7 +2710,7 @@ int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, in
// decompress lossy
return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL1(tsDecompressDoubleImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 0);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 0);
}
// Binary =====================================================
@ -2788,56 +2730,56 @@ int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, in
// Bool =====================================================
int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
FUNC_COMPRESS_IMPL1(tsCompressBoolImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 1);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 1);
}
int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
FUNC_COMPRESS_IMPL1(tsDecompressBoolImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 0);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 0);
}
// Tinyint =====================================================
int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 1);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 1);
}
int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 0);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 0);
}
// Smallint =====================================================
int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 1);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 1);
}
int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 0);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 0);
}
// Int =====================================================
int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 1);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 1);
}
int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 0);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 0);
}
// Bigint =====================================================
int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
FUNC_COMPRESS_IMPL2(tsCompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 1);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 1);
}
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
FUNC_COMPRESS_IMPL2(tsDecompressINTImp, pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0);
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0);
}
int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);

View File

@ -32,6 +32,7 @@ ENDIF()
#ENDIF ()
INCLUDE_DIRECTORIES(${TD_SOURCE_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_SOURCE_DIR}/include/common)
# arrayTest
add_executable(arrayTest "arrayTest.cpp")
@ -118,7 +119,7 @@ add_test(
)
add_executable(decompressTest "decompressTest.cpp")
target_link_libraries(decompressTest os util gtest_main)
target_link_libraries(decompressTest os util common gtest_main)
add_test(
NAME decompressTest
COMMAND decompressTest

View File

@ -2,6 +2,7 @@
#include <stdlib.h>
#include <tcompression.h>
#include <random>
#include "ttypes.h"
namespace {} // namespace
@ -117,29 +118,92 @@ void compressImplTest(void* pVal, int8_t type, int32_t sz, uint32_t cmprAlg) {
int64_t* pList = (int64_t*)pVal;
int32_t num = sz;
char* px = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t)));
char* pBuf = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t) + 64));
int32_t len =
tsCompressTimestamp2(pList, num * sizeof(int64_t), num, px, num, cmprAlg, pBuf, num * sizeof(int64_t) + 64);
int64_t bytes = 0; // tDataTypeDescriptor[TSDB_DATA_TYPE_TIMESTAMP].
char* px = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t)));
char* pBuf = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t) + 64));
int32_t len = tsCompressTimestamp2(pList, num * sizeof(int64_t), num, px, num * sizeof(int64_t) + 64, cmprAlg, pBuf,
num * sizeof(int64_t) + 64);
printf("compresess size: %d, actual size: %d\n", len, (int32_t)(num * sizeof(int64_t)));
char* pOutput = static_cast<char*>(taosMemoryCalloc(1, num * sizeof(int64_t)));
memset(pBuf, 0, num * sizeof(int64_t) + 64);
int32_t size =
tsDecompressTimestamp2(px, len, num, pOutput, sizeof(int64_t) * num, cmprAlg, pBuf, num * sizeof(int64_t) + 64);
printf("size: %d\n", size);
int32_t size = tsDecompressTimestamp2(px, len, num, pOutput, sizeof(int64_t) * num + 64, cmprAlg, pBuf,
num * sizeof(int64_t) + 64);
for (int i = 0; i < num; i++) {
int64_t val = *(int64_t*)(pOutput + i * sizeof(int64_t));
int32_t ival = val;
if (i < 10) printf("val = %d\n", ival);
ASSERT_EQ(val, pList[i]);
}
taosMemoryFree(px);
taosMemoryFree(pBuf);
taosMemoryFree(pOutput);
}
}
TEST(utilTest, zstdtest) {
int32_t num = 10000;
const char* alg[] = {"disabled", "lz4", "zlib", "zstd", "tsz", "xz"};
const char* end[] = {"disabled", "simppe8b", "delta"};
void compressImplTestByAlg(void* pVal, int8_t type, int32_t num, uint32_t cmprAlg) {
{
tDataTypeCompress compres = tDataCompress[type];
int32_t bytes = compres.bytes * num;
int32_t externalSize = bytes + 64;
char* px = static_cast<char*>(taosMemoryMalloc(externalSize));
char* pBuf = static_cast<char*>(taosMemoryMalloc(externalSize));
DEFINE_VAR(cmprAlg)
int32_t len = compres.compFunc(pVal, bytes, num, px, externalSize, cmprAlg, pBuf, externalSize);
printf("encode:%s, compress alg:%s, type:%s, compresess size: %d, actual size: %d, radio: %f\n", end[l1], alg[l2],
compres.name, len, bytes, (float)len / bytes);
char* pOutput = static_cast<char*>(taosMemoryCalloc(1, bytes));
memset(pBuf, 0, externalSize);
int32_t size = compres.decompFunc(px, len, num, pOutput, externalSize, cmprAlg, pBuf, externalSize);
ASSERT_EQ(size, bytes);
taosMemoryFree(px);
taosMemoryFree(pBuf);
taosMemoryFree(pOutput);
// taosMemoryFree(pVal);
}
}
int32_t fillDataByData(char* pBuf, void* pData, int32_t nBytes) {
memcpy(pBuf, pData, nBytes);
return 0;
}
void* genCompressData(int32_t type, int32_t num, bool order) {
tDataTypeDescriptor desc = tDataTypes[type];
int32_t cnt = num * (desc.bytes);
char* pBuf = (char*)taosMemoryCalloc(1, cnt);
char* p = pBuf;
uint32_t v = taosGetTimestampMs();
int64_t iniVal = 0;
for (int32_t i = 0; i < num; i++) {
int64_t d = taosRandR(&v);
if (type == TSDB_DATA_TYPE_BOOL) {
int8_t val = d % 2;
fillDataByData(p, &val, desc.bytes);
} else if (type == TSDB_DATA_TYPE_TINYINT) {
int8_t val = d % INT8_MAX;
fillDataByData(p, &val, desc.bytes);
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
int16_t val = d % INT16_MAX;
fillDataByData(p, &val, desc.bytes);
} else if (type == TSDB_DATA_TYPE_INT) {
int32_t val = d % INT32_MAX;
fillDataByData(p, &val, desc.bytes);
} else if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t val = d % INT64_MAX;
fillDataByData(p, &val, desc.bytes);
}
p += desc.bytes;
}
return pBuf;
}
TEST(utilTest, compressAlg) {
int32_t num = 4096;
int64_t* pList = static_cast<int64_t*>(taosMemoryCalloc(num, sizeof(int64_t)));
int64_t iniVal = 17000;
@ -149,10 +213,12 @@ TEST(utilTest, zstdtest) {
iniVal += i;
pList[i] = iniVal;
}
printf("ordered data\n");
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 1);
setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
@ -171,4 +237,90 @@ TEST(utilTest, zstdtest) {
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 1);
// setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 2);
// setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 3);
// setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
printf("unoreder data\n");
for (int32_t i = 0; i < num; ++i) {
iniVal = taosRandR(&v);
pList[i] = iniVal;
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 1);
setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 2);
setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 3);
setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
printf("unoreder data, no encode\n");
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 1);
// setColEncode(&cmprAlg, 0);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 2);
// setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 3);
// setColEncode(&cmprAlg, 1);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
for (int8_t type = 1; type <= 5; type++) {
printf("------summary, type: %s-------\n", tDataTypes[type].name);
char* p = (char*)genCompressData(type, num, 0);
for (int8_t i = 1; i <= 3; i++) {
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, i);
// setColEncode(&cmprAlg, 1);
compressImplTestByAlg(p, type, num, cmprAlg);
}
taosMemoryFree(p);
printf("-------------");
}
}
}