diff --git a/include/common/tcol.h b/include/common/tcol.h index 11b74d2a04..32fbe33290 100644 --- a/include/common/tcol.h +++ b/include/common/tcol.h @@ -17,13 +17,13 @@ #ifndef _TD_TCOL_H_ #define _TD_TCOL_H_ -#define TSDB_COLUMN_ENCODE_UNKNOWN "unknown" +#define TSDB_COLUMN_ENCODE_UNKNOWN "Unknown" #define TSDB_COLUMN_ENCODE_SIMPLE8B "simple8b" #define TSDB_COLUMN_ENCODE_XOR "xor" #define TSDB_COLUMN_ENCODE_RLE "rle" #define TSDB_COLUMN_ENCODE_DISABLED "disabled" -#define TSDB_COLUMN_COMPRESS_UNKNOWN "unknown" +#define TSDB_COLUMN_COMPRESS_UNKNOWN "Unknown" #define TSDB_COLUMN_COMPRESS_LZ4 "lz4" #define TSDB_COLUMN_COMPRESS_ZLIB "zlib" #define TSDB_COLUMN_COMPRESS_ZSTD "zstd" @@ -31,16 +31,16 @@ #define TSDB_COLUMN_COMPRESS_XZ "xz" #define TSDB_COLUMN_COMPRESS_DISABLED "disabled" -#define TSDB_COLUMN_LEVEL_UNKNOWN "unknown" +#define TSDB_COLUMN_LEVEL_UNKNOWN "Unknown" #define TSDB_COLUMN_LEVEL_HIGH "high" #define TSDB_COLUMN_LEVEL_MEDIUM "medium" #define TSDB_COLUMN_LEVEL_LOW "low" -#define TSDB_COLVAL_ENCODE_NOCHANGE 0 -#define TSDB_COLVAL_ENCODE_SIMPLE8B 1 -#define TSDB_COLVAL_ENCODE_XOR 2 -#define TSDB_COLVAL_ENCODE_RLE 3 -#define TSDB_COLVAL_ENCODE_DISABLED 0xff +#define TSDB_COLVAL_ENCODE_NOCHANGE 0 +#define TSDB_COLVAL_ENCODE_SIMPLE8B 1 +#define TSDB_COLVAL_ENCODE_XOR 2 +#define TSDB_COLVAL_ENCODE_RLE 3 +#define TSDB_COLVAL_ENCODE_DISABLED 0xff #define TSDB_COLVAL_COMPRESS_NOCHANGE 0 #define TSDB_COLVAL_COMPRESS_LZ4 1 @@ -50,12 +50,13 @@ #define TSDB_COLVAL_COMPRESS_XZ 5 #define TSDB_COLVAL_COMPRESS_DISABLED 0xff -#define TSDB_COLVAL_LEVEL_NOCHANGE 0 -#define TSDB_COLVAL_LEVEL_HIGH 1 -#define TSDB_COLVAL_LEVEL_MEDIUM 2 -#define TSDB_COLVAL_LEVEL_LOW 3 +#define TSDB_COLVAL_LEVEL_NOCHANGE 0 +#define TSDB_COLVAL_LEVEL_HIGH 1 +#define TSDB_COLVAL_LEVEL_MEDIUM 2 +#define TSDB_COLVAL_LEVEL_LOW 3 +#define TSDB_COLVAL_LEVEL_DISABLED 0xff -#define TSDB_CL_COMMENT_LEN 1025 +#define TSDB_CL_COMMENT_LEN 1025 #define TSDB_CL_COMPRESS_OPTION_LEN 12 extern const char* supportedEncode[4]; diff --git a/include/util/tcompression.h b/include/util/tcompression.h index af4375c08e..8e9c845547 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -256,21 +256,21 @@ typedef struct { } TCompressPara; typedef enum L1Compress { - L1_DISABLED, + L1_UNKNOWN, L1_SIMPLE_8B, L1_XOR, L1_RLE, - L1_MAX, + L1_DISABLED = 0xFF, } EL1CompressFuncType; typedef enum L2Compress { - L2_DISABLED, + L2_UNKNOWN, L2_LZ4, L2_ZLIB, L2_ZSTD, L2_TSZ, L2_XZ, - L2_MAX, + L2_DISABLED = 0xFF, } EL2ComressFuncType; int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level); @@ -289,7 +289,8 @@ int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t (cmpr) &= 0xFFFFFF00; \ (cmpr) |= (lvl); \ } while (0) -int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint32_t *dst); +int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, uint8_t lvlDisabled, uint8_t lvlDefault, + uint32_t *dst); #ifdef __cplusplus } #endif diff --git a/source/common/src/tcol.c b/source/common/src/tcol.c index 9e6f8d50ec..61243b69d9 100644 --- a/source/common/src/tcol.c +++ b/source/common/src/tcol.c @@ -208,6 +208,7 @@ const char* columnLevelStr(uint8_t type) { level = TSDB_COLUMN_LEVEL_LOW; break; default: + level = TSDB_COLUMN_LEVEL_UNKNOWN; break; } return level; @@ -286,8 +287,13 @@ void setColLevel(uint32_t* compress, uint8_t level) { void setColCompressByOption(uint32_t* compress, uint8_t encode, uint16_t compressType, uint8_t level) { setColEncode(compress, encode); - setColCompress(compress, compressType); - setColLevel(compress, level); + if (compressType == TSDB_COLVAL_COMPRESS_DISABLED) { + setColCompress(compress, compressType); + setColLevel(compress, TSDB_COLVAL_LEVEL_DISABLED); + } else { + setColCompress(compress, compressType); + setColLevel(compress, level); + } return; } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 4689daa347..5400d05da2 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -4232,7 +4232,8 @@ int32_t tCompressData(void *input, // input ASSERT(outputSize >= extraSizeNeeded); DEFINE_VAR(info->cmprAlg) - if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_DISABLED && l2 == L2_DISABLED)) { + if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || + (l1 == L1_DISABLED && l2 == L2_DISABLED)) { memcpy(output, input, info->originalSize); info->compressedSize = info->originalSize; } else if (info->cmprAlg == TWO_STAGE_COMP) { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 60933ee656..874c9cf6fe 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1685,7 +1685,8 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj * SColCmpr *pCmpr = &pNew->pCmpr[i]; if (pCmpr->id == colId) { uint32_t dst = 0; - updated = tUpdateCompress(pCmpr->alg, p->bytes, &dst); + updated = tUpdateCompress(pCmpr->alg, p->bytes, TSDB_COLVAL_COMPRESS_DISABLED, TSDB_COLVAL_LEVEL_DISABLED, + TSDB_COLVAL_LEVEL_MEDIUM, &dst); if (updated) pCmpr->alg = dst; break; } @@ -2351,6 +2352,8 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p taosRUnLockLatch(&pOld->lock); stbObj.pColumns = NULL; stbObj.pTags = NULL; + stbObj.pFuncs = NULL; + stbObj.pCmpr = NULL; stbObj.updateTime = taosGetTimestampMs(); stbObj.lock = 0; bool updateTagIndex = false; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 3d92af0a6f..537d3c7ad8 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -2187,7 +2187,8 @@ int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq * SColCmpr *p = &wp->pColCmpr[i]; if (p->id == pReq->colId) { uint32_t dst = 0; - updated = tUpdateCompress(p->alg, pReq->compress, &dst); + updated = tUpdateCompress(p->alg, pReq->compress, TSDB_COLVAL_COMPRESS_DISABLED, TSDB_COLVAL_LEVEL_DISABLED, + TSDB_COLVAL_LEVEL_MEDIUM, &dst); if (updated) { p->alg = dst; } diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 27b38648a6..68fca291ba 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -52,6 +52,7 @@ #include "lz4.h" #include "tcompare.h" #include "tlog.h" +#include "ttypes.h" // #include "tmsg.h" #include "fast-lzma2.h" @@ -62,6 +63,8 @@ #include "td_sz.h" #endif +int32_t tsCompressUnknow2(const char *const input, const int32_t nelements, char *const output, const char type); +int32_t tsDecompressUnknow2(const char *const input, const int32_t nelements, char *const output, const char type); // delta int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type); @@ -90,13 +93,16 @@ int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double return 0; } -int32_t l2CompressImpl_disabled(const char *const input, const int32_t nelements, char *const output, +int32_t l2CompressImpl_disabled(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type, int8_t lvl) { - return 0; + output[0] = 0; + memcpy(output + 1, input, inputSize); + return inputSize + 1; } -int32_t l2DecompressImpl_disabled(const char *const input, const int32_t nelements, char *const output, +int32_t l2DecompressImpl_disabled(const char *const input, const int32_t compressedSize, char *const output, int32_t outputSize, const char type) { - return 0; + memcpy(output, input + 1, compressedSize - 1); + return compressedSize - 1; } int32_t l2ComressInitImpl_lz4(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals, int32_t ifAdtFse, const char *compressor) { @@ -260,14 +266,14 @@ int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSiz return -1; } -TCompressL1FnSet compressL1Dict[] = {{"distabled", NULL, NULL, NULL}, +TCompressL1FnSet compressL1Dict[] = {{"unknown", NULL, tsCompressUnknow2, tsDecompressUnknow2}, {"timestamp", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2}, {"int", NULL, tsCompressINTImp2, tsDecompressINTImp2}, {"double", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2}, {"bool", NULL, tsCompressBoolImp2, tsDecompressBoolImp2}, {NULL, NULL, NULL}}; TCompressL2FnSet compressL2Dict[] = { - {"disabled", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled}, + {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled}, {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}, {"zlib", l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib}, {"zstd", l2ComressInitImpl_zstd, l2CompressImpl_zstd, l2DecompressImpl_zstd}, @@ -851,6 +857,18 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement return nelements * longBytes; } + +int32_t tsCompressUnknow2(const char *const input, const int32_t nelements, char *const output, const char type) { + int32_t bytes = tDataTypes[type].type * nelements; + output[0] = 0; + memcpy(output + 1, input, bytes); + return bytes + 1; +} +int32_t tsDecompressUnknow2(const char *const input, const int32_t nelements, char *const output, const char type) { + int32_t bytes = tDataTypes[type].bytes * nelements; + memcpy(output, input + 1, bytes); + return bytes; +} int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type) { return tsCompressTimestampImp(input, nelements, output); } @@ -2649,38 +2667,39 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } } -#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ - do { \ - DEFINE_VAR(cmprAlg) \ - if (compress) { \ - uTrace("encode:%s, compress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \ - } else { \ - uTrace("decode:%s, decompress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \ - } \ - if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ - if (compress) { \ - return compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ - } else { \ - return compressL1Dict[l1].decomprFn(pIn, nEle, pBuf, type); \ - } \ - } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ - if (compress) { \ - int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ - return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, lvl); \ - } else { \ - if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \ - return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \ - } \ - } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ - if (compress) { \ - return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, lvl); \ - } else { \ - return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \ - } \ - } else { \ - ASSERT(0); \ - } \ - return -1; \ +#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmrlAlg, pBuf, nBuf, type, compress) \ + do { \ + DEFINE_VAR(cmprAlg) \ + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { \ + if (compress) { \ + uTrace("encode:%s, compress:%s, level:%d", compressL1Dict[l1].name, "disabled", "disabled"); \ + return compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ + } else { \ + uTrace("dencode:%s, compress:%s, level:%d", compressL1Dict[l1].name, "disabled", "disabled"); \ + return compressL1Dict[l1].decomprFn(pIn, nEle, pBuf, type); \ + } \ + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ + if (compress) { \ + uTrace("encode:%s, compress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \ + int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \ + return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, lvl); \ + } else { \ + uTrace("dencode:%s, decompress:%s, level:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl); \ + if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \ + return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \ + } \ + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ + if (compress) { \ + uTrace("encode:%s, compress:%s, level:%d", "disabled", compressL2Dict[l1].name, lvl); \ + return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, lvl); \ + } else { \ + uTrace("dencode:%s, dcompress:%s, level:%d", "disabled", compressL2Dict[l1].name, lvl); \ + return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \ + } \ + } else { \ + ASSERT(0); \ + } \ + return -1; \ } while (1) /************************************************************************* @@ -2873,26 +2892,26 @@ typedef struct { SHashObj *algSet = NULL; -int32_t tsCompressSetInit() { - algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); - for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) { - TCompressCompatible p; - p.dtype = i; - p.l1Set = taosArrayInit(4, sizeof(int8_t)); - p.l2Set = taosArrayInit(4, sizeof(int8_t)); +// int32_t tsCompressSetInit() { +// algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); +// for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) { +// TCompressCompatible p; +// p.dtype = i; +// p.l1Set = taosArrayInit(4, sizeof(int8_t)); +// p.l2Set = taosArrayInit(4, sizeof(int8_t)); - for (int8_t j = L1_DISABLED; j < L1_MAX; j++) { - taosArrayPush(p.l1Set, &j); - } +// for (int8_t j = L1_DISABLED; j < L1_MAX; j++) { +// taosArrayPush(p.l1Set, &j); +// } - for (int8_t j = L2_DISABLED; j < L2_MAX; j++) { - taosArrayPush(p.l2Set, &j); - } +// for (int8_t j = L2_DISABLED; j < L2_MAX; j++) { +// taosArrayPush(p.l2Set, &j); +// } - taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible)); - } - return 0; -} +// taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible)); +// } +// return 0; +// } int32_t tsCompressSetDestroy() { void *p = taosHashIterate(algSet, NULL); while (p) { @@ -2933,7 +2952,8 @@ int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level = lvl; return 0; } -int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint32_t *dst) { +int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, uint8_t lvlDiabled, uint8_t lvlDefault, + uint32_t *dst) { uint8_t ol1 = COMPRESS_L1_TYPE_U32(oldCmpr); uint8_t ol2 = COMPRESS_L2_TYPE_U32(oldCmpr); uint8_t olvl = COMPRESS_L2_TYPE_LEVEL_U32(oldCmpr); @@ -2945,11 +2965,20 @@ int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint32_t *dst) { SET_COMPRESS(nl1, ol2, olvl, *dst); return 1; } else if (nl2 != 0 && ol2 != nl2) { - SET_COMPRESS(ol1, nl2, olvl, *dst); + if (nl2 == l2Disabled) { + SET_COMPRESS(ol1, nl2, lvlDiabled, *dst); + } else { + if (ol2 == l2Disabled) { + SET_COMPRESS(ol1, nl2, lvlDefault, *dst); + } else { + SET_COMPRESS(ol1, nl2, olvl, *dst); + } + } return 1; } else if (nlvl != 0 && olvl != nlvl) { SET_COMPRESS(ol1, ol2, nlvl, *dst); return 1; } + return 0; }