add compress dict

This commit is contained in:
yihaoDeng 2024-02-29 10:40:59 +00:00
parent 3808abf492
commit 9a6a533076
3 changed files with 166 additions and 17 deletions

View File

@ -539,9 +539,19 @@ struct SSchema {
// |----l1 compAlg----|-----l2 compAlg---|---level--|
// |------8bit--------|------16bit-------|---8bit---|
#define COMPRESS_L1_TYPE(type) ((type)&0xFF)
#define COMPRESS_L2_TYPE(type) (((type) >> 8) & 0xFFFF)
#define COMPRESS_L2_TYPE_LEVEL(type) (((type) >> 24) & 0xFF)
#define COMPRESS_L1_TYPE_U32(type) ((type)&0xFF)
#define COMPRESS_L2_TYPE_U32(type) (((type) >> 8) & 0xFFFF)
#define COMPRESS_L2_TYPE_LEVEL_U32(type) (((type) >> 24) & 0xFF)
// compress flag
// |----l2lel--|----l2Alg---|---l1Alg--|
// |----2bit---|----3bit----|---3bit---|
#define COMPRESS_L1_TYPE_U8(type) ((type)&0x07)
#define COMPRESS_L2_TYPE_U8(type) (((type) >> 3) & 0x07)
#define COMPRESS_L2_TYPE_LEVEL_U8(type) (((type) >> 6) & 0x03)
//
struct SSchema2 {
int8_t type;

View File

@ -164,32 +164,51 @@ typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, const int3
const char type);
typedef int32_t (*__data_compress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output,
const char type);
int32_t outputSize, const char type);
typedef int32_t (*__data_decompress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output,
const char type);
int32_t outputSize, const char type);
typedef struct {
__data_compress_init initFn;
__data_compress_l1_fn_t l1CompFn;
__data_decompress_l1_fn_t l1DeCompFn;
} TCompressL1;
__data_compress_l1_fn_t comprFn;
__data_decompress_l1_fn_t decomprFn;
} TCompressL1FnSet;
typedef struct {
__data_compress_init initFn;
__data_compress_l2_fn_t l2CompFn;
__data_decompress_l2_fn_t l2DeCompFn;
} TCompressL2;
__data_compress_l2_fn_t comprFn;
__data_decompress_l2_fn_t decomprFn;
} TCompressL2FnSet;
typedef struct {
int8_t type;
int8_t level;
__data_compress_init initFn;
__data_compress_l1_fn_t l1CompFn;
__data_decompress_l1_fn_t l1DeCompFn;
__data_compress_l2_fn_t l2CompFn;
__data_decompress_l2_fn_t l2DeCompFn;
__data_compress_l1_fn_t l1CmprFn;
__data_decompress_l1_fn_t l1DecmprFn;
__data_compress_l2_fn_t l2CmprFn;
__data_decompress_l2_fn_t l2DecmprFn;
} TCompressPara;
typedef enum L1Compress {
L1_DISABLED,
L1_SIMPLE_8B,
L1_XOR,
L1_RLE,
L1_MAX,
} EL1CompressFuncType;
typedef enum L2Compress {
L2_DISABLED,
L2_LZ4,
L2_ZLIB,
L2_ZSTD,
L2_TSZ,
L2_XZ,
L2_MAX,
} EL2ComressFuncType;
#ifdef __cplusplus
}
#endif

View File

@ -50,7 +50,9 @@
#define _DEFAULT_SOURCE
#include "tcompression.h"
#include "lz4.h"
#include "tcompare.h"
#include "tlog.h"
#include "tmsg.h"
#ifdef TD_TSZ
#include "td_sz.h"
@ -2393,7 +2395,125 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
}
}
TCompressL1 compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}};
TCompressL2 compressL2Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}};
int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);
int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
void *pBuf, int32_t nBuf) {
TCompressL1FnSet fn1;
TCompressL2FnSet fn2;
if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2)) return -1;
int32_t len = 0;
int8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg);
int8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg);
int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg);
if (l2 == L2_DISABLED) {
len = fn1.comprFn(pIn, nEle, pOut, type);
} else {
len = fn1.comprFn(pIn, nEle, pBuf, type);
len = fn2.comprFn(pBuf, len, pOut, nOut, type);
}
return len;
}
int32_t tsDecompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
void *pBuf, int32_t nBuf) {
TCompressL1FnSet fn1;
TCompressL2FnSet fn2;
if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2) != 0) return -1;
int8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg);
int8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg);
int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg);
int32_t len = 0;
if (l2 == L2_DISABLED) {
len = fn1.decomprFn(pIn, nEle, pOut, type);
} else {
len = fn2.decomprFn(pIn, nIn, pBuf, nBuf, type);
if (len < 0) return -1;
len = fn1.decomprFn(pBuf, nEle, pOut, type);
}
return len;
}
TCompressL1FnSet compressL1Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}};
TCompressL2FnSet compressL2Dict[] = {{NULL, NULL, NULL}, {NULL, NULL, NULL}};
int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn) {
int8_t l1 = COMPRESS_L1_TYPE_U8(compress);
int8_t l2 = COMPRESS_L2_TYPE_U8(compress);
int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress);
static int32_t l1Sz = sizeof(compressL1Dict) / sizeof(compressL1Dict[0]);
if (l1 >= l1Sz) return -1;
static int32_t l2Sz = sizeof(compressL2Dict) / sizeof(compressL2Dict[0]);
if (l2 >= l2Sz) return -1;
*l1Fn = compressL1Dict[l1];
*l2Fn = compressL2Dict[l2];
return 0;
}
typedef struct {
int8_t dtype;
SArray *l1Set;
SArray *l2Set;
} TCompressCompatible;
SHashObj *algSet = NULL;
int32_t tsCompressSetInit() {
algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) {
TCompressCompatible p;
p.dtype = i;
p.l1Set = taosArrayInit(4, sizeof(int8_t));
p.l2Set = taosArrayInit(4, sizeof(int8_t));
for (int8_t j = L1_DISABLED; j < L1_MAX; j++) {
taosArrayPush(p.l1Set, &j);
}
for (int8_t j = L2_DISABLED; j < L2_MAX; j++) {
taosArrayPush(p.l2Set, &j);
}
taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible));
}
return 0;
}
int32_t tsCompressSetDestroy() {
void *p = taosHashIterate(algSet, NULL);
while (p) {
TCompressCompatible *v = p;
taosArrayDestroy(v->l1Set);
taosArrayDestroy(v->l2Set);
taosHashIterate(algSet, p);
}
return 0;
}
int32_t tsValidCompressAlgByDataTypes(int8_t type, int8_t compress) {
// compress alg
int8_t l1 = COMPRESS_L1_TYPE_U8(compress);
int8_t l2 = COMPRESS_L2_TYPE_U8(compress);
int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress);
TCompressCompatible *p = taosHashGet(algSet, &type, sizeof(type));
if (p == NULL) return -1;
if (p->dtype != type) return -1;
if (taosArraySearch(p->l1Set, &l1, compareInt8Val, 0) == NULL) {
return -1;
}
if (taosArraySearch(p->l2Set, &l2, compareInt8Val, 0) == NULL) {
return -1;
}
return 0;
}