This commit is contained in:
yihaoDeng 2024-03-21 13:02:28 +00:00
parent 5407ac38db
commit 374f8e0462
2 changed files with 81 additions and 49 deletions

View File

@ -121,7 +121,7 @@ int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPre
int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type) {
uLongf dstLen = outputSize - 1;
int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 0);
int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 9);
if (ret == Z_OK) {
output[0] = 1;
return dstLen + 1;
@ -140,7 +140,7 @@ int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, ch
int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) {
if (input[0] == 1) {
uLongf len = 0;
uLongf len = outputSize;
int ret = uncompress((Bytef *)output, &len, (Bytef *)input + 1, compressedSize - 1);
if (ret == Z_OK) {
return len;
@ -165,7 +165,7 @@ int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPre
int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type) {
size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, 0);
size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, ZSTD_CLEVEL_DEFAULT);
if (len > inputSize) {
output[0] = 0;
memcpy(output + 1, input, inputSize);
@ -178,8 +178,7 @@ int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, ch
int32_t l2DecompressImpl_zstd(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) {
if (input[0] == 1) {
size_t len = ZSTD_decompress(output, outputSize, input + 1, compressedSize - 1);
return len;
return ZSTD_decompress(output, outputSize, input + 1, compressedSize - 1);
} else if (input[0] == 0) {
memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1;
@ -2656,11 +2655,12 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
return l1Func(pIn, nEle, pOut); \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
int32_t len = l1Func(pIn, nEle, pBuf); \
if (compress) { \
int32_t len = l1Func(pIn, nEle, pBuf); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \
if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \
return l1Func(pBuf, nEle, pOut); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
ASSERT(0); \
@ -2676,11 +2676,12 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
return l1Func(pIn, nEle, pOut, type); \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
int32_t len = l1Func(pIn, nEle, pBuf, type); \
if (compress) { \
int32_t len = l1Func(pIn, nEle, pBuf, type); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \
if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \
return l1Func(pBuf, nEle, pOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
ASSERT(0); \
@ -2689,28 +2690,29 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
} \
return -1; \
} while (1)
#define FUNC_COMPRESS_IMPL3(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
if (type != TSDB_DATA_TYPE_FLOAT && type != TSDB_DATA_RTYPE_DOUBLE { \
return -1; \
} \
DEFINE_VAR(cmprAlg) \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
return l1Func(pIn, nEle, pOut, type); \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
int32_t len = l1Func(pIn, nEle, pBuf, type); \
if (compress) { \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
} else { \
return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
ASSERT(0); \
} else { \
ASSERT(0); \
} \
return -1; \
} while (1)
// #define FUNC_COMPRESS_IMPL3(l1Func, pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
// do { \
// if (type != TSDB_DATA_TYPE_FLOAT && type != TSDB_DATA_RTYPE_DOUBLE { \
// return -1; \
// } \
// DEFINE_VAR(cmprAlg) \
// if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
// return l1Func(pIn, nEle, pOut, type); \
// } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
// int32_t len = l1Func(pIn, nEle, pBuf, type); \
// if (compress) { \
// return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type); \
// } else { \
// return compressL2Dict[l2].decomprFn(pBuf, len, pOut, nOut, type); \
// } \
// } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
// ASSERT(0); \
// } else { \
// ASSERT(0); \
// } \
// return -1; \
// } while (1)
// typedef int32_t (*__compress_fn)(const char *input, const int32_t nEle, char *const output);
/*************************************************************************

View File

@ -111,34 +111,64 @@ void setColLevel(uint32_t* compress, uint8_t level) {
*compress |= level;
return;
}
TEST(utilTest, zstdtest) {
int32_t num = 10000;
void compressImplTest(void* pVal, int8_t type, int32_t sz, uint32_t cmprAlg) {
{
int64_t* pList = (int64_t*)pVal;
int32_t num = sz;
char* px = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t)));
char* pBuf = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t) + 64));
int32_t len =
tsCompressTimestamp2(pList, num * sizeof(int64_t), num, px, num, cmprAlg, pBuf, num * sizeof(int64_t) + 64);
char* pOutput = static_cast<char*>(taosMemoryCalloc(1, num * sizeof(int64_t)));
memset(pBuf, 0, num * sizeof(int64_t) + 64);
int32_t size =
tsDecompressTimestamp2(px, len, num, pOutput, sizeof(int64_t) * num, cmprAlg, pBuf, num * sizeof(int64_t) + 64);
printf("size: %d\n", size);
for (int i = 0; i < num; i++) {
int64_t val = *(int64_t*)(pOutput + i * sizeof(int64_t));
int32_t ival = val;
if (i < 100) printf("val = %d\n", ival);
}
taosMemoryFree(px);
taosMemoryFree(pBuf);
taosMemoryFree(pOutput);
}
}
TEST(utilTest, zstdtest) {
int32_t num = 10000;
int64_t* pList = static_cast<int64_t*>(taosMemoryCalloc(num, sizeof(int64_t)));
int64_t iniVal = 1700000000;
int64_t iniVal = 17000;
uint32_t v = 100;
for (int32_t i = 0; i < num; ++i) {
iniVal += num;
iniVal += i;
pList[i] = iniVal;
}
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 1);
setColEncode(&cmprAlg, 1);
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 1);
setColEncode(&cmprAlg, 1);
char* pOutput = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t)));
compressImplTest((void*)pList, 0, num, cmprAlg);
}
int32_t bufSize = num * sizeof(int64_t) + 64;
char* pBuf = static_cast<char*>(taosMemoryMalloc(bufSize));
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 2);
setColEncode(&cmprAlg, 1);
int32_t sz =
tsCompressTimestamp2(pList, num * sizeof(int64_t), num, pOutput, num * sizeof(int64_t), cmprAlg, pBuf, bufSize);
printf("compress size: %d", sz);
compressImplTest((void*)pList, 0, num, cmprAlg);
}
{
uint32_t cmprAlg = 0;
setColCompress(&cmprAlg, 3);
setColEncode(&cmprAlg, 1);
tsDecompressTimestamp(pOutput, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf)
compressImplTest((void*)pList, 0, num, cmprAlg);
}
}