From f3ac2e1ef954b78494a2773fde543f01c6fa9432 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 22 Apr 2024 03:56:45 +0000 Subject: [PATCH] add tmq interface --- include/util/tcompression.h | 35 +++---- source/util/src/tcompression.c | 161 ++++++--------------------------- 2 files changed, 48 insertions(+), 148 deletions(-) diff --git a/include/util/tcompression.h b/include/util/tcompression.h index ab98058b87..683a8f2cee 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -236,35 +236,25 @@ typedef struct { __data_compress_init initFn; __data_compress_l1_fn_t comprFn; __data_decompress_l1_fn_t decomprFn; -} TCompressL1FnSet; +} TCmprL1FnSet; typedef struct { char *name; __data_compress_init initFn; __data_compress_l2_fn_t comprFn; __data_decompress_l2_fn_t decomprFn; -} TCompressL2FnSet; +} TCmprL2FnSet; -typedef struct { - int8_t type; - int8_t level; - __data_compress_init initFn; - __data_compress_l1_fn_t l1CmprFn; - __data_decompress_l1_fn_t l1DecmprFn; - __data_compress_l2_fn_t l2CmprFn; - __data_decompress_l2_fn_t l2DecmprFn; -} TCompressPara; - -typedef enum L1Compress { +typedef enum { L1_UNKNOWN = 0, L1_SIMPLE_8B, L1_XOR, L1_RLE, L1_DELTAD, L1_DISABLED = 0xFF, -} EL1CompressFuncType; +} TCmprL1Type; -typedef enum L2Compress { +typedef enum { L2_UNKNOWN = 0, L2_LZ4, L2_ZLIB, @@ -272,7 +262,20 @@ typedef enum L2Compress { L2_TSZ, L2_XZ, L2_DISABLED = 0xFF, -} EL2ComressFuncType; +} TCmprL2Type; + +typedef enum { + L2_LVL_NOCHANGE = 0, + L2_LVL_LOW, + L2_LVL_MEDIUM, + L2_LVL_HIGH, + L2_LVL_DISABLED = 0xFF, +} TCmprLvlType; + +typedef struct { + char *name; + uint8_t lvl[3]; // l[0] = 'low', l[1] = 'mid', l[2] = 'high' +} TCmprLvlSet; int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level); diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 9c8c1acfb9..d36c67c452 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -179,6 +179,7 @@ int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, c #if defined(WINDOWS) || defined(_TD_DARWIN_64) // do nothing #else + int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals, int32_t ifAdtFse, const char *compressor) { return 0; @@ -226,7 +227,7 @@ int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPre int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type, int8_t lvl) { - size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, ZSTD_CLEVEL_DEFAULT); + size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, lvl); if (len > inputSize) { output[0] = 0; memcpy(output + 1, input, inputSize); @@ -253,7 +254,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci } int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type, int8_t lvl) { - size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, 0); + size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); if (len > inputSize) { output[0] = 0; memcpy(output + 1, input, inputSize); @@ -274,14 +275,14 @@ int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSiz } #endif -TCompressL1FnSet compressL1Dict[] = {{"PLAIN", NULL, tsCompressPlain2, tsDecompressPlain2}, - {"SIMPLE-8B", NULL, tsCompressINTImp2, tsDecompressINTImp2}, - {"DELTAI", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2}, - {"BIT-PACKING", NULL, tsCompressBoolImp2, tsDecompressBoolImp2}, - {"DELTAD", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2}}; +TCmprL1FnSet compressL1Dict[] = {{"PLAIN", NULL, tsCompressPlain2, tsDecompressPlain2}, + {"SIMPLE-8B", NULL, tsCompressINTImp2, tsDecompressINTImp2}, + {"DELTAI", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2}, + {"BIT-PACKING", NULL, tsCompressBoolImp2, tsDecompressBoolImp2}, + {"DELTAD", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2}}; #if defined(WINDOWS) || defined(_TD_DARWIN_64) -TCompressL2FnSet compressL2Dict[] = { +TCmprL2FnSet compressL2Dict[] = { {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled}, {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}, {"zlib", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}, @@ -289,7 +290,7 @@ TCompressL2FnSet compressL2Dict[] = { {"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz}, {"xz", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}}; #else -TCompressL2FnSet compressL2Dict[] = { +TCmprL2FnSet compressL2Dict[] = { {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled}, {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}, {"zlib", l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib}, @@ -297,8 +298,23 @@ TCompressL2FnSet compressL2Dict[] = { {"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz}, {"xz", l2ComressInitImpl_xz, l2CompressImpl_xz, l2DecompressImpl_xz}}; +TCmprLvlSet compressL2LevelDict[] = { + {"unknown", .lvl = {1, 2, 3}}, {"lz4", .lvl = {1, 2, 3}}, {"zlib", .lvl = {1, 6, 9}}, + {"zstd", .lvl = {1, 11, 22}}, {"tsz", .lvl = {1, 2, 3}}, {"xz", .lvl = {1, 6, 9}}, +}; #endif +int8_t tsGetCompressL2Level(uint8_t alg, uint8_t lvl) { + if (lvl == L2_LVL_LOW) { + return compressL2LevelDict[alg].lvl[0]; + } else if (lvl == L2_LVL_MEDIUM) { + return compressL2LevelDict[alg].lvl[1]; + } else if (lvl == L2_LVL_HIGH) { + return compressL2LevelDict[alg].lvl[2]; + } + return 1; +} + static const int32_t TEST_NUMBER = 1; #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL) @@ -2704,7 +2720,8 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int uTrace("encode:%s, compress:%s, level:%d, type:%s, l1:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, \ lvl, tDataTypes[type].name, l1); \ int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ - return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, lvl); \ + int8_t alvl = tsGetCompressL2Level(l2, lvl); \ + return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, alvl); \ } else { \ uTrace("dencode:%s, decompress:%s, level:%d, type:%s", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl, \ tDataTypes[type].name); \ @@ -2715,7 +2732,8 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int if (compress) { \ uTrace("encode:%s, compress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \ tDataTypes[type].name); \ - return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, lvl); \ + int8_t alvl = tsGetCompressL2Level(l2, lvl); \ + return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, alvl); \ } else { \ uTrace("dencode:%s, dcompress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \ tDataTypes[type].name); \ @@ -2913,127 +2931,6 @@ int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, in FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0); } -// int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn); - -// int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, -// void *pBuf, int32_t nBuf) { -// TCompressL1FnSet fn1; -// TCompressL2FnSet fn2; - -// if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2)) return -1; - -// int32_t len = 0; -// uint8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg); -// uint8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg); -// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg); - -// if (l2 == L2_DISABLED) { -// len = fn1.comprFn(pIn, nEle, pOut, type); -// } else { -// len = fn1.comprFn(pIn, nEle, pBuf, type); -// len = fn2.comprFn(pBuf, len, pOut, nOut, type, lvl); -// } -// return len; -// } -// int32_t tsDecompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t -// cmprAlg, -// void *pBuf, int32_t nBuf) { -// TCompressL1FnSet fn1; -// TCompressL2FnSet fn2; - -// if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2) != 0) return -1; - -// uint8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg); -// uint8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg); -// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg); -// uint32_t len = 0; -// if (l2 == L2_DISABLED) { -// len = fn1.decomprFn(pIn, nEle, pOut, type); -// } else { -// len = fn2.decomprFn(pIn, nIn, pBuf, nBuf, type); -// if (len < 0) return -1; -// len = fn1.decomprFn(pBuf, nEle, pOut, type); -// } -// return len; -// } - -// int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn) { -// uint8_t l1 = COMPRESS_L1_TYPE_U8(compress); -// uint8_t l2 = COMPRESS_L2_TYPE_U8(compress); -// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress); - -// static int32_t l1Sz = sizeof(compressL1Dict) / sizeof(compressL1Dict[0]); -// if (l1 >= l1Sz) return -1; - -// static int32_t l2Sz = sizeof(compressL2Dict) / sizeof(compressL2Dict[0]); -// if (l2 >= l2Sz) return -1; - -// *l1Fn = compressL1Dict[l1]; -// *l2Fn = compressL2Dict[l2]; -// return 0; -// } - -// typedef struct { -// int8_t dtype; -// SArray *l1Set; -// SArray *l2Set; -// } TCompressCompatible; - -// SHashObj *algSet = NULL; - -// int32_t tsCompressSetInit() { -// algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); -// for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) { -// TCompressCompatible p; -// p.dtype = i; -// p.l1Set = taosArrayInit(4, sizeof(int8_t)); -// p.l2Set = taosArrayInit(4, sizeof(int8_t)); - -// for (int8_t j = L1_DISABLED; j < L1_MAX; j++) { -// taosArrayPush(p.l1Set, &j); -// } - -// for (int8_t j = L2_DISABLED; j < L2_MAX; j++) { -// taosArrayPush(p.l2Set, &j); -// } - -// taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible)); -// } -// return 0; -// } -// int32_t tsCompressSetDestroy() { -// void *p = taosHashIterate(algSet, NULL); -// while (p) { -// TCompressCompatible *v = p; -// taosArrayDestroy(v->l1Set); -// taosArrayDestroy(v->l2Set); - -// taosHashIterate(algSet, p); -// } -// return 0; -// } - -// int32_t tsValidCompressAlgByDataTypes(int8_t type, int8_t compress) { -// // compress alg -// int8_t l1 = COMPRESS_L1_TYPE_U8(compress); -// int8_t l2 = COMPRESS_L2_TYPE_U8(compress); -// int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress); - -// TCompressCompatible *p = taosHashGet(algSet, &type, sizeof(type)); -// if (p == NULL) return -1; - -// if (p->dtype != type) return -1; - -// if (taosArraySearch(p->l1Set, &l1, compareInt8Val, 0) == NULL) { -// return -1; -// } - -// if (taosArraySearch(p->l2Set, &l2, compareInt8Val, 0) == NULL) { -// return -1; -// } -// return 0; -// } - int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level) { DEFINE_VAR(cmprAlg) *l1Alg = l1;