refactor code

This commit is contained in:
yihaoDeng 2024-03-26 12:44:53 +00:00
parent 02e0e45183
commit 18986c0ced
7 changed files with 122 additions and 80 deletions

View File

@ -17,13 +17,13 @@
#ifndef _TD_TCOL_H_
#define _TD_TCOL_H_
#define TSDB_COLUMN_ENCODE_UNKNOWN "unknown"
#define TSDB_COLUMN_ENCODE_UNKNOWN "Unknown"
#define TSDB_COLUMN_ENCODE_SIMPLE8B "simple8b"
#define TSDB_COLUMN_ENCODE_XOR "xor"
#define TSDB_COLUMN_ENCODE_RLE "rle"
#define TSDB_COLUMN_ENCODE_DISABLED "disabled"
#define TSDB_COLUMN_COMPRESS_UNKNOWN "unknown"
#define TSDB_COLUMN_COMPRESS_UNKNOWN "Unknown"
#define TSDB_COLUMN_COMPRESS_LZ4 "lz4"
#define TSDB_COLUMN_COMPRESS_ZLIB "zlib"
#define TSDB_COLUMN_COMPRESS_ZSTD "zstd"
@ -31,16 +31,16 @@
#define TSDB_COLUMN_COMPRESS_XZ "xz"
#define TSDB_COLUMN_COMPRESS_DISABLED "disabled"
#define TSDB_COLUMN_LEVEL_UNKNOWN "unknown"
#define TSDB_COLUMN_LEVEL_UNKNOWN "Unknown"
#define TSDB_COLUMN_LEVEL_HIGH "high"
#define TSDB_COLUMN_LEVEL_MEDIUM "medium"
#define TSDB_COLUMN_LEVEL_LOW "low"
#define TSDB_COLVAL_ENCODE_NOCHANGE 0
#define TSDB_COLVAL_ENCODE_SIMPLE8B 1
#define TSDB_COLVAL_ENCODE_XOR 2
#define TSDB_COLVAL_ENCODE_RLE 3
#define TSDB_COLVAL_ENCODE_DISABLED 0xff
#define TSDB_COLVAL_ENCODE_NOCHANGE 0
#define TSDB_COLVAL_ENCODE_SIMPLE8B 1
#define TSDB_COLVAL_ENCODE_XOR 2
#define TSDB_COLVAL_ENCODE_RLE 3
#define TSDB_COLVAL_ENCODE_DISABLED 0xff
#define TSDB_COLVAL_COMPRESS_NOCHANGE 0
#define TSDB_COLVAL_COMPRESS_LZ4 1
@ -50,12 +50,13 @@
#define TSDB_COLVAL_COMPRESS_XZ 5
#define TSDB_COLVAL_COMPRESS_DISABLED 0xff
#define TSDB_COLVAL_LEVEL_NOCHANGE 0
#define TSDB_COLVAL_LEVEL_HIGH 1
#define TSDB_COLVAL_LEVEL_MEDIUM 2
#define TSDB_COLVAL_LEVEL_LOW 3
#define TSDB_COLVAL_LEVEL_NOCHANGE 0
#define TSDB_COLVAL_LEVEL_HIGH 1
#define TSDB_COLVAL_LEVEL_MEDIUM 2
#define TSDB_COLVAL_LEVEL_LOW 3
#define TSDB_COLVAL_LEVEL_DISABLED 0xff
#define TSDB_CL_COMMENT_LEN 1025
#define TSDB_CL_COMMENT_LEN 1025
#define TSDB_CL_COMPRESS_OPTION_LEN 12
extern const char* supportedEncode[4];

View File

@ -256,21 +256,21 @@ typedef struct {
} TCompressPara;
typedef enum L1Compress {
L1_DISABLED,
L1_UNKNOWN,
L1_SIMPLE_8B,
L1_XOR,
L1_RLE,
L1_MAX,
L1_DISABLED = 0xFF,
} EL1CompressFuncType;
typedef enum L2Compress {
L2_DISABLED,
L2_UNKNOWN,
L2_LZ4,
L2_ZLIB,
L2_ZSTD,
L2_TSZ,
L2_XZ,
L2_MAX,
L2_DISABLED = 0xFF,
} EL2ComressFuncType;
int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level);
@ -289,7 +289,8 @@ int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t
(cmpr) &= 0xFFFFFF00; \
(cmpr) |= (lvl); \
} while (0)
int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint32_t *dst);
int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, uint8_t lvlDisabled, uint8_t lvlDefault,
uint32_t *dst);
#ifdef __cplusplus
}
#endif

View File

@ -208,6 +208,7 @@ const char* columnLevelStr(uint8_t type) {
level = TSDB_COLUMN_LEVEL_LOW;
break;
default:
level = TSDB_COLUMN_LEVEL_UNKNOWN;
break;
}
return level;
@ -286,8 +287,13 @@ void setColLevel(uint32_t* compress, uint8_t level) {
void setColCompressByOption(uint32_t* compress, uint8_t encode, uint16_t compressType, uint8_t level) {
setColEncode(compress, encode);
setColCompress(compress, compressType);
setColLevel(compress, level);
if (compressType == TSDB_COLVAL_COMPRESS_DISABLED) {
setColCompress(compress, compressType);
setColLevel(compress, TSDB_COLVAL_LEVEL_DISABLED);
} else {
setColCompress(compress, compressType);
setColLevel(compress, level);
}
return;
}

View File

@ -4232,7 +4232,8 @@ int32_t tCompressData(void *input, // input
ASSERT(outputSize >= extraSizeNeeded);
DEFINE_VAR(info->cmprAlg)
if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_DISABLED && l2 == L2_DISABLED)) {
if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) ||
(l1 == L1_DISABLED && l2 == L2_DISABLED)) {
memcpy(output, input, info->originalSize);
info->compressedSize = info->originalSize;
} else if (info->cmprAlg == TWO_STAGE_COMP) {

View File

@ -1685,7 +1685,8 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *
SColCmpr *pCmpr = &pNew->pCmpr[i];
if (pCmpr->id == colId) {
uint32_t dst = 0;
updated = tUpdateCompress(pCmpr->alg, p->bytes, &dst);
updated = tUpdateCompress(pCmpr->alg, p->bytes, TSDB_COLVAL_COMPRESS_DISABLED, TSDB_COLVAL_LEVEL_DISABLED,
TSDB_COLVAL_LEVEL_MEDIUM, &dst);
if (updated) pCmpr->alg = dst;
break;
}
@ -2351,6 +2352,8 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
taosRUnLockLatch(&pOld->lock);
stbObj.pColumns = NULL;
stbObj.pTags = NULL;
stbObj.pFuncs = NULL;
stbObj.pCmpr = NULL;
stbObj.updateTime = taosGetTimestampMs();
stbObj.lock = 0;
bool updateTagIndex = false;

View File

@ -2187,7 +2187,8 @@ int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *
SColCmpr *p = &wp->pColCmpr[i];
if (p->id == pReq->colId) {
uint32_t dst = 0;
updated = tUpdateCompress(p->alg, pReq->compress, &dst);
updated = tUpdateCompress(p->alg, pReq->compress, TSDB_COLVAL_COMPRESS_DISABLED, TSDB_COLVAL_LEVEL_DISABLED,
TSDB_COLVAL_LEVEL_MEDIUM, &dst);
if (updated) {
p->alg = dst;
}

View File

@ -52,6 +52,7 @@
#include "lz4.h"
#include "tcompare.h"
#include "tlog.h"
#include "ttypes.h"
// #include "tmsg.h"
#include "fast-lzma2.h"
@ -62,6 +63,8 @@
#include "td_sz.h"
#endif
int32_t tsCompressUnknow2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressUnknow2(const char *const input, const int32_t nelements, char *const output, const char type);
// delta
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type);
@ -90,13 +93,16 @@ int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double
return 0;
}
int32_t l2CompressImpl_disabled(const char *const input, const int32_t nelements, char *const output,
int32_t l2CompressImpl_disabled(const char *const input, const int32_t inputSize, char *const output,
int32_t outputSize, const char type, int8_t lvl) {
return 0;
output[0] = 0;
memcpy(output + 1, input, inputSize);
return inputSize + 1;
}
int32_t l2DecompressImpl_disabled(const char *const input, const int32_t nelements, char *const output,
int32_t l2DecompressImpl_disabled(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) {
return 0;
memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1;
}
int32_t l2ComressInitImpl_lz4(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
@ -260,14 +266,14 @@ int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSiz
return -1;
}
TCompressL1FnSet compressL1Dict[] = {{"distabled", NULL, NULL, NULL},
TCompressL1FnSet compressL1Dict[] = {{"unknown", NULL, tsCompressUnknow2, tsDecompressUnknow2},
{"timestamp", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2},
{"int", NULL, tsCompressINTImp2, tsDecompressINTImp2},
{"double", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2},
{"bool", NULL, tsCompressBoolImp2, tsDecompressBoolImp2},
{NULL, NULL, NULL}};
TCompressL2FnSet compressL2Dict[] = {
{"disabled", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
{"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
{"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
{"zlib", l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib},
{"zstd", l2ComressInitImpl_zstd, l2CompressImpl_zstd, l2DecompressImpl_zstd},
@ -851,6 +857,18 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
return nelements * longBytes;
}
int32_t tsCompressUnknow2(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t bytes = tDataTypes[type].type * nelements;
output[0] = 0;
memcpy(output + 1, input, bytes);
return bytes + 1;
}
int32_t tsDecompressUnknow2(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t bytes = tDataTypes[type].bytes * nelements;
memcpy(output, input + 1, bytes);
return bytes;
}
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
return tsCompressTimestampImp(input, nelements, output);
}
@ -2649,38 +2667,39 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
}
}
#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
DEFINE_VAR(cmprAlg) \
if (compress) { \
uTrace("encode:%s, compress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \
} else { \
uTrace("decode:%s, decompress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \
} \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
if (compress) { \
return compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
} else { \
return compressL1Dict[l1].decomprFn(pIn, nEle, pBuf, type); \
} \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, lvl); \
} else { \
if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \
return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, lvl); \
} else { \
return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \
} \
} else { \
ASSERT(0); \
} \
return -1; \
#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \
do { \
DEFINE_VAR(cmprAlg) \
if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \
if (compress) { \
uTrace("encode:%s, compress:%s, level:%d", compressL1Dict[l1].name, "disabled", "disabled"); \
return compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
} else { \
uTrace("dencode:%s, compress:%s, level:%d", compressL1Dict[l1].name, "disabled", "disabled"); \
return compressL1Dict[l1].decomprFn(pIn, nEle, pBuf, type); \
} \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
uTrace("encode:%s, compress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \
int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, lvl); \
} else { \
uTrace("dencode:%s, decompress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \
if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \
return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
uTrace("encode:%s, compress:%s, level:%d", "disabled", compressL2Dict[l1].name, lvl); \
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, lvl); \
} else { \
uTrace("dencode:%s, dcompress:%s, level:%d", "disabled", compressL2Dict[l1].name, lvl); \
return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \
} \
} else { \
ASSERT(0); \
} \
return -1; \
} while (1)
/*************************************************************************
@ -2873,26 +2892,26 @@ typedef struct {
SHashObj *algSet = NULL;
int32_t tsCompressSetInit() {
algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) {
TCompressCompatible p;
p.dtype = i;
p.l1Set = taosArrayInit(4, sizeof(int8_t));
p.l2Set = taosArrayInit(4, sizeof(int8_t));
// int32_t tsCompressSetInit() {
// algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
// for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) {
// TCompressCompatible p;
// p.dtype = i;
// p.l1Set = taosArrayInit(4, sizeof(int8_t));
// p.l2Set = taosArrayInit(4, sizeof(int8_t));
for (int8_t j = L1_DISABLED; j < L1_MAX; j++) {
taosArrayPush(p.l1Set, &j);
}
// for (int8_t j = L1_DISABLED; j < L1_MAX; j++) {
// taosArrayPush(p.l1Set, &j);
// }
for (int8_t j = L2_DISABLED; j < L2_MAX; j++) {
taosArrayPush(p.l2Set, &j);
}
// for (int8_t j = L2_DISABLED; j < L2_MAX; j++) {
// taosArrayPush(p.l2Set, &j);
// }
taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible));
}
return 0;
}
// taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible));
// }
// return 0;
// }
int32_t tsCompressSetDestroy() {
void *p = taosHashIterate(algSet, NULL);
while (p) {
@ -2933,7 +2952,8 @@ int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t
*level = lvl;
return 0;
}
int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint32_t *dst) {
int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, uint8_t lvlDiabled, uint8_t lvlDefault,
uint32_t *dst) {
uint8_t ol1 = COMPRESS_L1_TYPE_U32(oldCmpr);
uint8_t ol2 = COMPRESS_L2_TYPE_U32(oldCmpr);
uint8_t olvl = COMPRESS_L2_TYPE_LEVEL_U32(oldCmpr);
@ -2945,11 +2965,20 @@ int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint32_t *dst) {
SET_COMPRESS(nl1, ol2, olvl, *dst);
return 1;
} else if (nl2 != 0 && ol2 != nl2) {
SET_COMPRESS(ol1, nl2, olvl, *dst);
if (nl2 == l2Disabled) {
SET_COMPRESS(ol1, nl2, lvlDiabled, *dst);
} else {
if (ol2 == l2Disabled) {
SET_COMPRESS(ol1, nl2, lvlDefault, *dst);
} else {
SET_COMPRESS(ol1, nl2, olvl, *dst);
}
}
return 1;
} else if (nlvl != 0 && olvl != nlvl) {
SET_COMPRESS(ol1, ol2, nlvl, *dst);
return 1;
}
return 0;
}