From 9a6a533076322406154c7f4dca42bf201f074488 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 29 Feb 2024 10:40:59 +0000 Subject: [PATCH] add compress dict --- include/common/tmsg.h | 16 ++++- include/util/tcompression.h | 43 ++++++++---- source/util/src/tcompression.c | 124 ++++++++++++++++++++++++++++++++- 3 files changed, 166 insertions(+), 17 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b74befcdd3..af848fce60 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -539,9 +539,19 @@ struct SSchema { // |----l1 compAlg----|-----l2 compAlg---|---level--| // |------8bit--------|------16bit-------|---8bit---| -#define COMPRESS_L1_TYPE(type) ((type)&0xFF) -#define COMPRESS_L2_TYPE(type) (((type) >> 8) & 0xFFFF) -#define COMPRESS_L2_TYPE_LEVEL(type) (((type) >> 24) & 0xFF) +#define COMPRESS_L1_TYPE_U32(type) ((type)&0xFF) +#define COMPRESS_L2_TYPE_U32(type) (((type) >> 8) & 0xFFFF) +#define COMPRESS_L2_TYPE_LEVEL_U32(type) (((type) >> 24) & 0xFF) + +// compress flag +// |----l2lel--|----l2Alg---|---l1Alg--| +// |----2bit---|----3bit----|---3bit---| + +#define COMPRESS_L1_TYPE_U8(type) ((type)&0x07) +#define COMPRESS_L2_TYPE_U8(type) (((type) >> 3) & 0x07) +#define COMPRESS_L2_TYPE_LEVEL_U8(type) (((type) >> 6) & 0x03) + +// struct SSchema2 { int8_t type; diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 7e3073dfde..8dd435022c 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -164,32 +164,51 @@ typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, const int3 const char type); typedef int32_t (*__data_compress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output, - const char type); + int32_t outputSize, const char type); typedef int32_t (*__data_decompress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output, - const char type); + int32_t outputSize, const char type); typedef struct { __data_compress_init initFn; - __data_compress_l1_fn_t l1CompFn; - __data_decompress_l1_fn_t l1DeCompFn; -} TCompressL1; + __data_compress_l1_fn_t comprFn; + __data_decompress_l1_fn_t decomprFn; +} TCompressL1FnSet; typedef struct { __data_compress_init initFn; - __data_compress_l2_fn_t l2CompFn; - __data_decompress_l2_fn_t l2DeCompFn; -} TCompressL2; + __data_compress_l2_fn_t comprFn; + __data_decompress_l2_fn_t decomprFn; + +} TCompressL2FnSet; typedef struct { int8_t type; int8_t level; __data_compress_init initFn; - __data_compress_l1_fn_t l1CompFn; - __data_decompress_l1_fn_t l1DeCompFn; - __data_compress_l2_fn_t l2CompFn; - __data_decompress_l2_fn_t l2DeCompFn; + __data_compress_l1_fn_t l1CmprFn; + __data_decompress_l1_fn_t l1DecmprFn; + __data_compress_l2_fn_t l2CmprFn; + __data_decompress_l2_fn_t l2DecmprFn; } TCompressPara; +typedef enum L1Compress { + L1_DISABLED, + L1_SIMPLE_8B, + L1_XOR, + L1_RLE, + L1_MAX, +} EL1CompressFuncType; + +typedef enum L2Compress { + L2_DISABLED, + L2_LZ4, + L2_ZLIB, + L2_ZSTD, + L2_TSZ, + L2_XZ, + L2_MAX, +} EL2ComressFuncType; + #ifdef __cplusplus } #endif diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 7c5b66226e..8b5f268f81 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -50,7 +50,9 @@ #define _DEFAULT_SOURCE #include "tcompression.h" #include "lz4.h" +#include "tcompare.h" #include "tlog.h" +#include "tmsg.h" #ifdef TD_TSZ #include "td_sz.h" @@ -2393,7 +2395,125 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } } -TCompressL1 compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}}; -TCompressL2 compressL2Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}}; +int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn); +int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, + void *pBuf, int32_t nBuf) { + TCompressL1FnSet fn1; + TCompressL2FnSet fn2; + if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2)) return -1; + + int32_t len = 0; + int8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg); + int8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg); + int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg); + + if (l2 == L2_DISABLED) { + len = fn1.comprFn(pIn, nEle, pOut, type); + } else { + len = fn1.comprFn(pIn, nEle, pBuf, type); + len = fn2.comprFn(pBuf, len, pOut, nOut, type); + } + return len; +} +int32_t tsDecompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, + void *pBuf, int32_t nBuf) { + TCompressL1FnSet fn1; + TCompressL2FnSet fn2; + + if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2) != 0) return -1; + + int8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg); + int8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg); + int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg); + int32_t len = 0; + if (l2 == L2_DISABLED) { + len = fn1.decomprFn(pIn, nEle, pOut, type); + } else { + len = fn2.decomprFn(pIn, nIn, pBuf, nBuf, type); + if (len < 0) return -1; + len = fn1.decomprFn(pBuf, nEle, pOut, type); + } + return len; +} + +TCompressL1FnSet compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}}; +TCompressL2FnSet compressL2Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}}; + +int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn) { + int8_t l1 = COMPRESS_L1_TYPE_U8(compress); + int8_t l2 = COMPRESS_L2_TYPE_U8(compress); + int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress); + + static int32_t l1Sz = sizeof(compressL1Dict) / sizeof(compressL1Dict[0]); + if (l1 >= l1Sz) return -1; + + static int32_t l2Sz = sizeof(compressL2Dict) / sizeof(compressL2Dict[0]); + if (l2 >= l2Sz) return -1; + + *l1Fn = compressL1Dict[l1]; + *l2Fn = compressL2Dict[l2]; + return 0; +} + +typedef struct { + int8_t dtype; + SArray *l1Set; + SArray *l2Set; +} TCompressCompatible; + +SHashObj *algSet = NULL; + +int32_t tsCompressSetInit() { + algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) { + TCompressCompatible p; + p.dtype = i; + p.l1Set = taosArrayInit(4, sizeof(int8_t)); + p.l2Set = taosArrayInit(4, sizeof(int8_t)); + + for (int8_t j = L1_DISABLED; j < L1_MAX; j++) { + taosArrayPush(p.l1Set, &j); + } + + for (int8_t j = L2_DISABLED; j < L2_MAX; j++) { + taosArrayPush(p.l2Set, &j); + } + + taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible)); + } + return 0; +} +int32_t tsCompressSetDestroy() { + void *p = taosHashIterate(algSet, NULL); + while (p) { + TCompressCompatible *v = p; + taosArrayDestroy(v->l1Set); + taosArrayDestroy(v->l2Set); + + taosHashIterate(algSet, p); + } + return 0; +} + +int32_t tsValidCompressAlgByDataTypes(int8_t type, int8_t compress) { + // compress alg + int8_t l1 = COMPRESS_L1_TYPE_U8(compress); + int8_t l2 = COMPRESS_L2_TYPE_U8(compress); + int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress); + + TCompressCompatible *p = taosHashGet(algSet, &type, sizeof(type)); + if (p == NULL) return -1; + + if (p->dtype != type) return -1; + + if (taosArraySearch(p->l1Set, &l1, compareInt8Val, 0) == NULL) { + return -1; + } + + if (taosArraySearch(p->l2Set, &l2, compareInt8Val, 0) == NULL) { + return -1; + } + return 0; +}