add tmq interface

This commit is contained in:
Yihao Deng 2024-04-22 03:56:45 +00:00
parent f0147770e0
commit f3ac2e1ef9
2 changed files with 48 additions and 148 deletions

View File

@ -236,35 +236,25 @@ typedef struct {
__data_compress_init initFn; __data_compress_init initFn;
__data_compress_l1_fn_t comprFn; __data_compress_l1_fn_t comprFn;
__data_decompress_l1_fn_t decomprFn; __data_decompress_l1_fn_t decomprFn;
} TCompressL1FnSet; } TCmprL1FnSet;
typedef struct { typedef struct {
char *name; char *name;
__data_compress_init initFn; __data_compress_init initFn;
__data_compress_l2_fn_t comprFn; __data_compress_l2_fn_t comprFn;
__data_decompress_l2_fn_t decomprFn; __data_decompress_l2_fn_t decomprFn;
} TCompressL2FnSet; } TCmprL2FnSet;
typedef struct { typedef enum {
int8_t type;
int8_t level;
__data_compress_init initFn;
__data_compress_l1_fn_t l1CmprFn;
__data_decompress_l1_fn_t l1DecmprFn;
__data_compress_l2_fn_t l2CmprFn;
__data_decompress_l2_fn_t l2DecmprFn;
} TCompressPara;
typedef enum L1Compress {
L1_UNKNOWN = 0, L1_UNKNOWN = 0,
L1_SIMPLE_8B, L1_SIMPLE_8B,
L1_XOR, L1_XOR,
L1_RLE, L1_RLE,
L1_DELTAD, L1_DELTAD,
L1_DISABLED = 0xFF, L1_DISABLED = 0xFF,
} EL1CompressFuncType; } TCmprL1Type;
typedef enum L2Compress { typedef enum {
L2_UNKNOWN = 0, L2_UNKNOWN = 0,
L2_LZ4, L2_LZ4,
L2_ZLIB, L2_ZLIB,
@ -272,7 +262,20 @@ typedef enum L2Compress {
L2_TSZ, L2_TSZ,
L2_XZ, L2_XZ,
L2_DISABLED = 0xFF, L2_DISABLED = 0xFF,
} EL2ComressFuncType; } TCmprL2Type;
typedef enum {
L2_LVL_NOCHANGE = 0,
L2_LVL_LOW,
L2_LVL_MEDIUM,
L2_LVL_HIGH,
L2_LVL_DISABLED = 0xFF,
} TCmprLvlType;
typedef struct {
char *name;
uint8_t lvl[3]; // l[0] = 'low', l[1] = 'mid', l[2] = 'high'
} TCmprLvlSet;
int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level); int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level);

View File

@ -179,6 +179,7 @@ int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, c
#if defined(WINDOWS) || defined(_TD_DARWIN_64) #if defined(WINDOWS) || defined(_TD_DARWIN_64)
// do nothing // do nothing
#else #else
int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) { uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0; return 0;
@ -226,7 +227,7 @@ int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPre
int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) { const char type, int8_t lvl) {
size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, ZSTD_CLEVEL_DEFAULT); size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, lvl);
if (len > inputSize) { if (len > inputSize) {
output[0] = 0; output[0] = 0;
memcpy(output + 1, input, inputSize); memcpy(output + 1, input, inputSize);
@ -253,7 +254,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci
} }
int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) { const char type, int8_t lvl) {
size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, 0); size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl);
if (len > inputSize) { if (len > inputSize) {
output[0] = 0; output[0] = 0;
memcpy(output + 1, input, inputSize); memcpy(output + 1, input, inputSize);
@ -274,14 +275,14 @@ int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSiz
} }
#endif #endif
TCompressL1FnSet compressL1Dict[] = {{"PLAIN", NULL, tsCompressPlain2, tsDecompressPlain2}, TCmprL1FnSet compressL1Dict[] = {{"PLAIN", NULL, tsCompressPlain2, tsDecompressPlain2},
{"SIMPLE-8B", NULL, tsCompressINTImp2, tsDecompressINTImp2}, {"SIMPLE-8B", NULL, tsCompressINTImp2, tsDecompressINTImp2},
{"DELTAI", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2}, {"DELTAI", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2},
{"BIT-PACKING", NULL, tsCompressBoolImp2, tsDecompressBoolImp2}, {"BIT-PACKING", NULL, tsCompressBoolImp2, tsDecompressBoolImp2},
{"DELTAD", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2}}; {"DELTAD", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2}};
#if defined(WINDOWS) || defined(_TD_DARWIN_64) #if defined(WINDOWS) || defined(_TD_DARWIN_64)
TCompressL2FnSet compressL2Dict[] = { TCmprL2FnSet compressL2Dict[] = {
{"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled}, {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
{"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}, {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
{"zlib", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}, {"zlib", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
@ -289,7 +290,7 @@ TCompressL2FnSet compressL2Dict[] = {
{"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz}, {"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz},
{"xz", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}}; {"xz", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}};
#else #else
TCompressL2FnSet compressL2Dict[] = { TCmprL2FnSet compressL2Dict[] = {
{"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled}, {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
{"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}, {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
{"zlib", l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib}, {"zlib", l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib},
@ -297,8 +298,23 @@ TCompressL2FnSet compressL2Dict[] = {
{"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz}, {"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz},
{"xz", l2ComressInitImpl_xz, l2CompressImpl_xz, l2DecompressImpl_xz}}; {"xz", l2ComressInitImpl_xz, l2CompressImpl_xz, l2DecompressImpl_xz}};
TCmprLvlSet compressL2LevelDict[] = {
{"unknown", .lvl = {1, 2, 3}}, {"lz4", .lvl = {1, 2, 3}}, {"zlib", .lvl = {1, 6, 9}},
{"zstd", .lvl = {1, 11, 22}}, {"tsz", .lvl = {1, 2, 3}}, {"xz", .lvl = {1, 6, 9}},
};
#endif #endif
int8_t tsGetCompressL2Level(uint8_t alg, uint8_t lvl) {
if (lvl == L2_LVL_LOW) {
return compressL2LevelDict[alg].lvl[0];
} else if (lvl == L2_LVL_MEDIUM) {
return compressL2LevelDict[alg].lvl[1];
} else if (lvl == L2_LVL_HIGH) {
return compressL2LevelDict[alg].lvl[2];
}
return 1;
}
static const int32_t TEST_NUMBER = 1; static const int32_t TEST_NUMBER = 1;
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL) #define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
@ -2704,7 +2720,8 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
uTrace("encode:%s, compress:%s, level:%d, type:%s, l1:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, \ uTrace("encode:%s, compress:%s, level:%d, type:%s, l1:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, \
lvl, tDataTypes[type].name, l1); \ lvl, tDataTypes[type].name, l1); \
int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, lvl); \ int8_t alvl = tsGetCompressL2Level(l2, lvl); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, alvl); \
} else { \ } else { \
uTrace("dencode:%s, decompress:%s, level:%d, type:%s", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl, \ uTrace("dencode:%s, decompress:%s, level:%d, type:%s", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl, \
tDataTypes[type].name); \ tDataTypes[type].name); \
@ -2715,7 +2732,8 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
if (compress) { \ if (compress) { \
uTrace("encode:%s, compress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \ uTrace("encode:%s, compress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \
tDataTypes[type].name); \ tDataTypes[type].name); \
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, lvl); \ int8_t alvl = tsGetCompressL2Level(l2, lvl); \
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, alvl); \
} else { \ } else { \
uTrace("dencode:%s, dcompress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \ uTrace("dencode:%s, dcompress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \
tDataTypes[type].name); \ tDataTypes[type].name); \
@ -2913,127 +2931,6 @@ int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, in
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0); FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0);
} }
// int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);
// int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
// void *pBuf, int32_t nBuf) {
// TCompressL1FnSet fn1;
// TCompressL2FnSet fn2;
// if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2)) return -1;
// int32_t len = 0;
// uint8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg);
// uint8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg);
// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg);
// if (l2 == L2_DISABLED) {
// len = fn1.comprFn(pIn, nEle, pOut, type);
// } else {
// len = fn1.comprFn(pIn, nEle, pBuf, type);
// len = fn2.comprFn(pBuf, len, pOut, nOut, type, lvl);
// }
// return len;
// }
// int32_t tsDecompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t
// cmprAlg,
// void *pBuf, int32_t nBuf) {
// TCompressL1FnSet fn1;
// TCompressL2FnSet fn2;
// if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2) != 0) return -1;
// uint8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg);
// uint8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg);
// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg);
// uint32_t len = 0;
// if (l2 == L2_DISABLED) {
// len = fn1.decomprFn(pIn, nEle, pOut, type);
// } else {
// len = fn2.decomprFn(pIn, nIn, pBuf, nBuf, type);
// if (len < 0) return -1;
// len = fn1.decomprFn(pBuf, nEle, pOut, type);
// }
// return len;
// }
// int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn) {
// uint8_t l1 = COMPRESS_L1_TYPE_U8(compress);
// uint8_t l2 = COMPRESS_L2_TYPE_U8(compress);
// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress);
// static int32_t l1Sz = sizeof(compressL1Dict) / sizeof(compressL1Dict[0]);
// if (l1 >= l1Sz) return -1;
// static int32_t l2Sz = sizeof(compressL2Dict) / sizeof(compressL2Dict[0]);
// if (l2 >= l2Sz) return -1;
// *l1Fn = compressL1Dict[l1];
// *l2Fn = compressL2Dict[l2];
// return 0;
// }
// typedef struct {
// int8_t dtype;
// SArray *l1Set;
// SArray *l2Set;
// } TCompressCompatible;
// SHashObj *algSet = NULL;
// int32_t tsCompressSetInit() {
// algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
// for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) {
// TCompressCompatible p;
// p.dtype = i;
// p.l1Set = taosArrayInit(4, sizeof(int8_t));
// p.l2Set = taosArrayInit(4, sizeof(int8_t));
// for (int8_t j = L1_DISABLED; j < L1_MAX; j++) {
// taosArrayPush(p.l1Set, &j);
// }
// for (int8_t j = L2_DISABLED; j < L2_MAX; j++) {
// taosArrayPush(p.l2Set, &j);
// }
// taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible));
// }
// return 0;
// }
// int32_t tsCompressSetDestroy() {
// void *p = taosHashIterate(algSet, NULL);
// while (p) {
// TCompressCompatible *v = p;
// taosArrayDestroy(v->l1Set);
// taosArrayDestroy(v->l2Set);
// taosHashIterate(algSet, p);
// }
// return 0;
// }
// int32_t tsValidCompressAlgByDataTypes(int8_t type, int8_t compress) {
// // compress alg
// int8_t l1 = COMPRESS_L1_TYPE_U8(compress);
// int8_t l2 = COMPRESS_L2_TYPE_U8(compress);
// int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress);
// TCompressCompatible *p = taosHashGet(algSet, &type, sizeof(type));
// if (p == NULL) return -1;
// if (p->dtype != type) return -1;
// if (taosArraySearch(p->l1Set, &l1, compareInt8Val, 0) == NULL) {
// return -1;
// }
// if (taosArraySearch(p->l2Set, &l2, compareInt8Val, 0) == NULL) {
// return -1;
// }
// return 0;
// }
int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level) { int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level) {
DEFINE_VAR(cmprAlg) DEFINE_VAR(cmprAlg)
*l1Alg = l1; *l1Alg = l1;