diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 7242c51933..f0e41acbb9 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -28,18 +28,18 @@ extern "C" { #endif -typedef struct SSchema SSchema; -typedef struct SSchema2 SSchema2; +typedef struct SSchema SSchema; +typedef struct SSchema2 SSchema2; typedef struct SSchemaExt SSchemaExt; -typedef struct STColumn STColumn; -typedef struct STSchema STSchema; -typedef struct SValue SValue; -typedef struct SColVal SColVal; -typedef struct SRow SRow; -typedef struct SRowIter SRowIter; -typedef struct STagVal STagVal; -typedef struct STag STag; -typedef struct SColData SColData; +typedef struct STColumn STColumn; +typedef struct STSchema STSchema; +typedef struct SValue SValue; +typedef struct SColVal SColVal; +typedef struct SRow SRow; +typedef struct SRowIter SRowIter; +typedef struct STagVal STagVal; +typedef struct STag STag; +typedef struct SColData SColData; typedef struct SRowKey SRowKey; typedef struct SValueColumn SValueColumn; @@ -57,9 +57,9 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111 #define ONE ((uint8_t)1) #define THREE ((uint8_t)3) #define DIV_8(i) ((i) >> 3) -#define MOD_8(i) ((i) & 7) +#define MOD_8(i) ((i)&7) #define DIV_4(i) ((i) >> 2) -#define MOD_4(i) ((i) & 3) +#define MOD_4(i) ((i)&3) #define MOD_4_TIME_2(i) (MOD_4(i) << 1) #define BIT1_SIZE(n) (DIV_8((n)-1) + 1) #define BIT2_SIZE(n) (DIV_4((n)-1) + 1) @@ -97,12 +97,12 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111 // SValueColumn ================================ typedef struct { - int8_t cmprAlg; // filled by caller - int8_t type; - int32_t dataOriginalSize; - int32_t dataCompressedSize; - int32_t offsetOriginalSize; - int32_t offsetCompressedSize; + uint32_t cmprAlg; // filled by caller + int8_t type; + int32_t dataOriginalSize; + int32_t dataCompressedSize; + int32_t offsetOriginalSize; + int32_t offsetCompressedSize; } SValueColumnCompressInfo; int32_t tValueColumnInit(SValueColumn *valCol); @@ -149,7 +149,7 @@ int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, voi // SColData ================================ typedef struct { - int8_t cmprAlg; // filled by caller + uint32_t cmprAlg; // filled by caller int8_t columnFlag; int8_t flag; int8_t dataType; @@ -338,10 +338,10 @@ struct SValueColumn { }; typedef struct { - int8_t dataType; // filled by caller - int8_t cmprAlg; // filled by caller - int32_t originalSize; // filled by caller - int32_t compressedSize; + int32_t dataType; // filled by caller + uint32_t cmprAlg; // filled by caller + int32_t originalSize; // filled by caller + int32_t compressedSize; } SCompressInfo; int32_t tCompressData(void *input, // input diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 741e3663db..f10f419b6f 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -268,9 +268,11 @@ typedef struct { #define IS_MATHABLE_TYPE(_t) \ (IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP)) -#define IS_VAR_DATA_TYPE(t) \ - (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY)) -#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) +#define IS_VAR_DATA_TYPE(t) \ + (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \ + ((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY)) +#define IS_STR_DATA_TYPE(t) \ + (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) #define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX) #define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX) @@ -340,13 +342,28 @@ typedef struct tDataTypeDescriptor { int32_t nBuf); } tDataTypeDescriptor; +typedef struct tDataTypeCompress { + int16_t type; + int16_t nameLen; + int32_t bytes; + char *name; + int64_t minValue; + int64_t maxValue; + int32_t (*compFunc)(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); + int32_t (*decompFunc)(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +} tDataTypeCompress; + extern tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX]; -bool isValidDataType(int32_t type); +extern tDataTypeCompress tDataCompress[TSDB_DATA_TYPE_MAX]; + +bool isValidDataType(int32_t type); int32_t operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type); -void assignVal(char *val, const char *src, int32_t len, int32_t type); -void *getDataMin(int32_t type, void* value); -void *getDataMax(int32_t type, void* value); +void assignVal(char *val, const char *src, int32_t len, int32_t type); +void *getDataMin(int32_t type, void *value); +void *getDataMax(int32_t type, void *value); #ifdef __cplusplus } diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 8dd435022c..4c12a4b71a 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -145,6 +145,55 @@ int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelem bool bigEndian); int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian); +/************************************************************************* + * REGULAR COMPRESSION 2 + *************************************************************************/ +int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf); +int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf); +int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf); +int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf); +int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf); +// for internal usage +int32_t getWordLength(char type); + +int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); +int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, + bool bigEndian); +int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian); + /************************************************************************* * STREAM COMPRESSION *************************************************************************/ diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index af124f7a02..a42620472d 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -4183,7 +4183,7 @@ int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *info, SBu uint8_t fmtVer = 0; if ((code = tBufferPutU8(buffer, fmtVer))) return code; - if ((code = tBufferPutI8(buffer, info->cmprAlg))) return code; + if ((code = tBufferPutU32(buffer, info->cmprAlg))) return code; if ((code = tBufferPutI8(buffer, info->type))) return code; if (IS_VAR_DATA_TYPE(info->type)) { if ((code = tBufferPutI32v(buffer, info->offsetOriginalSize))) return code; @@ -4201,7 +4201,7 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre if ((code = tBufferGetU8(reader, &fmtVer))) return code; if (fmtVer == 0) { - if ((code = tBufferGetI8(reader, &info->cmprAlg))) return code; + if ((code = tBufferGetU32(reader, &info->cmprAlg))) return code; if ((code = tBufferGetI8(reader, &info->type))) return code; if (IS_VAR_DATA_TYPE(info->type)) { if ((code = tBufferGetI32v(reader, &info->offsetOriginalSize))) return code; @@ -4234,7 +4234,7 @@ int32_t tCompressData(void *input, // input if (info->cmprAlg == NO_COMPRESSION) { memcpy(output, input, info->originalSize); info->compressedSize = info->originalSize; - } else { + } else if (info->cmprAlg == TWO_STAGE_COMP) { SBuffer local; tBufferInit(&local); @@ -4266,6 +4266,8 @@ int32_t tCompressData(void *input, // input } tBufferDestroy(&local); + } else { + // new col compress } return 0; @@ -4284,7 +4286,7 @@ int32_t tDecompressData(void *input, // input if (info->cmprAlg == NO_COMPRESSION) { ASSERT(info->compressedSize == info->originalSize); memcpy(output, input, info->compressedSize); - } else { + } else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) { SBuffer local; tBufferInit(&local); @@ -4315,6 +4317,38 @@ int32_t tDecompressData(void *input, // input return TSDB_CODE_COMPRESS_ERROR; } + ASSERT(decompressedSize == info->originalSize); + tBufferDestroy(&local); + } else { + SBuffer local; + + tBufferInit(&local); + if (buffer == NULL) { + buffer = &local; + } + if (info->cmprAlg == TWO_STAGE_COMP) { + code = tBufferEnsureCapacity(buffer, info->originalSize + COMP_OVERFLOW_BYTES); + if (code) { + tBufferDestroy(&local); + return code; + } + } + + int32_t decompressedSize = tDataTypes[info->dataType].decompFunc( + input, // input + info->compressedSize, // inputSize + info->originalSize / tDataTypes[info->dataType].bytes, // number of elements + output, // output + outputSize, // output size + info->cmprAlg, // compression algorithm + buffer->data, // helper buffer + buffer->capacity // extra buffer size + ); + if (decompressedSize < 0) { + tBufferDestroy(&local); + return TSDB_CODE_COMPRESS_ERROR; + } + ASSERT(decompressedSize == info->originalSize); tBufferDestroy(&local); } diff --git a/source/common/src/ttypes.c b/source/common/src/ttypes.c index 8827ddc811..6f2ed496c3 100644 --- a/source/common/src/ttypes.c +++ b/source/common/src/ttypes.c @@ -61,13 +61,44 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = { {TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt}, {TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint}, {TSDB_DATA_TYPE_JSON, 4, TSDB_MAX_JSON_TAG_LEN, "JSON", 0, 0, tsCompressString, tsDecompressString}, - {TSDB_DATA_TYPE_VARBINARY, 9, 1, "VARBINARY", 0, 0, tsCompressString, tsDecompressString}, // placeholder, not implemented + {TSDB_DATA_TYPE_VARBINARY, 9, 1, "VARBINARY", 0, 0, tsCompressString, + tsDecompressString}, // placeholder, not implemented {TSDB_DATA_TYPE_DECIMAL, 7, 1, "DECIMAL", 0, 0, NULL, NULL}, // placeholder, not implemented {TSDB_DATA_TYPE_BLOB, 4, 1, "BLOB", 0, 0, NULL, NULL}, // placeholder, not implemented {TSDB_DATA_TYPE_MEDIUMBLOB, 10, 1, "MEDIUMBLOB", 0, 0, NULL, NULL}, // placeholder, not implemented {TSDB_DATA_TYPE_GEOMETRY, 8, 1, "GEOMETRY", 0, 0, tsCompressString, tsDecompressString}, }; +tDataTypeCompress tDataCompress[TSDB_DATA_TYPE_MAX] = { + {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL}, + {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", false, true, tsCompressBool2, tsDecompressBool2}, + {TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", INT8_MIN, INT8_MAX, tsCompressTinyint2, tsDecompressTinyint2}, + {TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", INT16_MIN, INT16_MAX, tsCompressSmallint2, + tsDecompressSmallint2}, + {TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", INT32_MIN, INT32_MAX, tsCompressInt2, tsDecompressInt2}, + {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", INT64_MIN, INT64_MAX, tsCompressBigint2, tsDecompressBigint2}, + {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat2, tsDecompressFloat2}, + {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble2, tsDecompressDouble2}, + {TSDB_DATA_TYPE_VARCHAR, 6, 1, "VARCHAR", 0, 0, tsCompressString2, tsDecompressString2}, + {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp2, + tsDecompressTimestamp2}, + {TSDB_DATA_TYPE_NCHAR, 5, 1, "NCHAR", 0, 0, tsCompressString2, tsDecompressString2}, + {TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint2, + tsDecompressTinyint2}, + {TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint2, + tsDecompressSmallint2}, + {TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt2, tsDecompressInt2}, + {TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint2, tsDecompressBigint2}, + {TSDB_DATA_TYPE_JSON, 4, TSDB_MAX_JSON_TAG_LEN, "JSON", 0, 0, tsCompressString2, tsDecompressString2}, + {TSDB_DATA_TYPE_VARBINARY, 9, 1, "VARBINARY", 0, 0, tsCompressString2, + tsDecompressString2}, // placeholder, not implemented + {TSDB_DATA_TYPE_DECIMAL, 7, 1, "DECIMAL", 0, 0, NULL, NULL}, // placeholder, not implemented + {TSDB_DATA_TYPE_BLOB, 4, 1, "BLOB", 0, 0, NULL, NULL}, // placeholder, not implemented + {TSDB_DATA_TYPE_MEDIUMBLOB, 10, 1, "MEDIUMBLOB", 0, 0, NULL, NULL}, // placeholder, not implemented + {TSDB_DATA_TYPE_GEOMETRY, 8, 1, "GEOMETRY", 0, 0, tsCompressString2, tsDecompressString2}, + +}; + static float floatMin = -FLT_MAX, floatMax = FLT_MAX; static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index dfcaac0f15..e11d976814 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -176,7 +176,7 @@ int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid); int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); void tBlockDataClear(SBlockData *pBlockData); -int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist); +int32_t tBlockDataCompress(SBlockData *bData, void *pCmprInfo, SBuffer *buffers, SBuffer *assist); int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist); int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, SBuffer *assist); int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br, diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9fdb4993bd..511670fe2d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -108,17 +108,17 @@ typedef struct SQueryNode SQueryNode; #define VNODE_METRIC_SQL_COUNT "taosd_sql_req:count" -#define VNODE_METRIC_TAG_NAME_SQL_TYPE "sql_type" +#define VNODE_METRIC_TAG_NAME_SQL_TYPE "sql_type" #define VNODE_METRIC_TAG_NAME_CLUSTER_ID "cluster_id" -#define VNODE_METRIC_TAG_NAME_DNODE_ID "dnode_id" -#define VNODE_METRIC_TAG_NAME_DNODE_EP "dnode_ep" -#define VNODE_METRIC_TAG_NAME_VGROUP_ID "vgroup_id" -#define VNODE_METRIC_TAG_NAME_USERNAME "username" -#define VNODE_METRIC_TAG_NAME_RESULT "result" +#define VNODE_METRIC_TAG_NAME_DNODE_ID "dnode_id" +#define VNODE_METRIC_TAG_NAME_DNODE_EP "dnode_ep" +#define VNODE_METRIC_TAG_NAME_VGROUP_ID "vgroup_id" +#define VNODE_METRIC_TAG_NAME_USERNAME "username" +#define VNODE_METRIC_TAG_NAME_RESULT "result" #define VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS "inserted_rows" -//#define VNODE_METRIC_TAG_VALUE_INSERT "insert" -//#define VNODE_METRIC_TAG_VALUE_DELETE "delete" +// #define VNODE_METRIC_TAG_VALUE_INSERT "insert" +// #define VNODE_METRIC_TAG_VALUE_DELETE "delete" // vnd.h typedef int32_t (*_query_reseek_func_t)(void* pQHandle); @@ -461,12 +461,12 @@ typedef struct SVCommitSched { int64_t maxWaitMs; } SVCommitSched; -typedef struct SVMonitorObj{ - char strClusterId[TSDB_CLUSTER_ID_LEN]; - char strDnodeId[TSDB_NODE_ID_LEN]; - char strVgId[TSDB_VGROUP_ID_LEN]; - taos_counter_t *insertCounter; -}SVMonitorObj; +typedef struct SVMonitorObj { + char strClusterId[TSDB_CLUSTER_ID_LEN]; + char strDnodeId[TSDB_NODE_ID_LEN]; + char strVgId[TSDB_VGROUP_ID_LEN]; + taos_counter_t* insertCounter; +} SVMonitorObj; struct SVnode { char* path; diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 0da07077df..224bcb6db6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -14,6 +14,7 @@ */ #include "tsdbDataFileRW.h" +#include "meta.h" // SDataFileReader ============================================= struct SDataFileReader { @@ -878,6 +879,8 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData SBuffer *buffers = writer->buffers; SBuffer *assist = writer->buffers + 4; + SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = writer->config->cmprAlg}; + SBrinRecord record[1] = {{ .suid = bData->suid, .uid = bData->uid, @@ -909,8 +912,10 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer); - // to .data file - code = tBlockDataCompress(bData, writer->config->cmprAlg, buffers, assist); + code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid, + &cmprInfo.pColCmpr); + + code = tBlockDataCompress(bData, &cmprInfo, buffers, assist); TSDB_CHECK_CODE(code, lino, _exit); record->blockKeySize = buffers[0].size + buffers[1].size; @@ -953,6 +958,8 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } + taosHashCleanup(cmprInfo.pColCmpr); + return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbDef.h b/source/dnode/vnode/src/tsdb/tsdbDef.h index 0eaf3e68a6..3d62712e02 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDef.h +++ b/source/dnode/vnode/src/tsdb/tsdbDef.h @@ -38,6 +38,13 @@ extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t extern int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint); extern int32_t tsdbFsyncFile(STsdbFD *pFD); +typedef struct SColCompressInfo SColCompressInfo; +struct SColCompressInfo { + SHashObj *pColCmpr; + int32_t defaultCmprAlg; +}; + +// int32_t tsdbGetCompressByUid(void *meta, tb_uid_t uid, struct SColCompressInfo *info); #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRW.c b/source/dnode/vnode/src/tsdb/tsdbFSetRW.c index b0917cceb0..f301877a04 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSetRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRW.c @@ -299,3 +299,15 @@ _exit: } return code; } +// int32_t tsdbGetCompressByUid(SFSetWriter *writer, tb_uid_t uid, struct SColCompressInfo *info) { +// SHashObj *p = NULL; +// int32_t code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, uid, &p); +// if (code < 0) { +// ASSERT(0); +// taosHashCleanup(p); +// p = NULL; +// } else { +// } +// info->pColCmpr = p; +// return code; +// } diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRW.h b/source/dnode/vnode/src/tsdb/tsdbFSetRW.h index 815b956cf4..0a8049cded 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSetRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRW.h @@ -41,7 +41,6 @@ typedef struct { bool exist; STFile file; } files[TSDB_FTYPE_MAX]; - SHashObj *pColCmpr; } SFSetWriterConfig; int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index dece1c7b98..f1f3cdf236 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -247,7 +247,10 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) { int32_t code = 0; int32_t lino = 0; - code = tBlockDataCompress(reader->blockData, NO_COMPRESSION, reader->buffers, reader->buffers + 4); + SColCompressInfo info; + + SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION}; + code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4); TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index ab503ed440..25d54c3e45 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -14,6 +14,7 @@ */ #include "tsdbSttFileRW.h" +#include "meta.h" #include "tsdbDataFileRW.h" // SSttFReader ============================================================ @@ -464,7 +465,9 @@ static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, i tsdbWriterUpdVerRange(range, sttBlk->minVer, sttBlk->maxVer); - code = tBlockDataCompress(blockData, cmprAlg, buffers, buffers + 4); + SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = cmprAlg}; + code = tBlockDataCompress(blockData, &cmprInfo, buffers, buffers + 4); + if (code) return code; sttBlk->bInfo.offset = *fileSize; sttBlk->bInfo.szKey = buffers[0].size + buffers[1].size; @@ -492,6 +495,10 @@ static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; + // SHashObj *colCmpr = NULL; + // tb_uid_t uid = writer->blockData->suid == 0 ? writer->blockData->uid : writer->blockData->suid; + // code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, uid, &colCmpr); + code = tsdbFileDoWriteSttBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size, writer->sttBlkArray, writer->buffers, &writer->ctx->range); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 95fe189d18..db842fd57d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -15,8 +15,12 @@ #include "tdataformat.h" #include "tsdb.h" +#include "tsdbDef.h" -static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist); +int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg); + +static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist, + SColCompressInfo *pCompressExt); // SMapData ======================================================================= void tMapDataReset(SMapData *pMapData) { @@ -1403,10 +1407,12 @@ SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) { * buffers[2]: SBlockCol part * buffers[3]: regular column part */ -int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist) { +int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SBuffer *assist) { int32_t code = 0; int32_t lino = 0; + SColCompressInfo *pInfo = pCompr; + SDiskDataHdr hdr = { .delimiter = TSDB_FILE_DLMT, .fmtVer = 1, @@ -1417,13 +1423,12 @@ int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, .szKey = 0, // filled by compress key .szBlkCol = 0, // filled by this func .nRow = bData->nRow, - .cmprAlg = cmprAlg, + .cmprAlg = pInfo->defaultCmprAlg, .numOfPKs = 0, // filled by compress key }; - // Key part tBufferClear(&buffers[1]); - code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist); + code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo); TSDB_CHECK_CODE(code, lino, _exit); // Regulart column part @@ -1440,23 +1445,27 @@ int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, } SColDataCompressInfo cinfo = { - .cmprAlg = cmprAlg, + .cmprAlg = pInfo->defaultCmprAlg, }; + code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg); + if (code < 0) { + // + } + int32_t offset = buffers[3].size; code = tColDataCompress(colData, &cinfo, &buffers[3], assist); TSDB_CHECK_CODE(code, lino, _exit); - SBlockCol blockCol = (SBlockCol){ - .cid = cinfo.columnId, - .type = cinfo.dataType, - .cflag = cinfo.columnFlag, - .flag = cinfo.flag, - .szOrigin = cinfo.dataOriginalSize, - .szBitmap = cinfo.bitmapCompressedSize, - .szOffset = cinfo.offsetCompressedSize, - .szValue = cinfo.dataCompressedSize, - .offset = offset, - }; + SBlockCol blockCol = (SBlockCol){.cid = cinfo.columnId, + .type = cinfo.dataType, + .cflag = cinfo.columnFlag, + .flag = cinfo.flag, + .szOrigin = cinfo.dataOriginalSize, + .szBitmap = cinfo.bitmapCompressedSize, + .szOffset = cinfo.offsetCompressedSize, + .szValue = cinfo.dataCompressedSize, + .offset = offset, + .alg = cinfo.cmprAlg}; code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer); TSDB_CHECK_CODE(code, lino, _exit); @@ -1581,7 +1590,8 @@ int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) { return 0; } -static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist) { +static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist, + SColCompressInfo *compressInfo) { int32_t code = 0; int32_t lino = 0; SCompressInfo cinfo; @@ -1632,6 +1642,12 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S SColDataCompressInfo info = { .cmprAlg = hdr->cmprAlg, }; + code = tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg); + if (code < 0) { + // do nothing + } else { + } + code = tColDataCompress(colData, &info, buffer, assist); TSDB_CHECK_CODE(code, lino, _exit); @@ -1645,6 +1661,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .szOffset = info.offsetCompressedSize, .szValue = info.dataCompressedSize, .offset = 0, + .alg = info.cmprAlg, }; } @@ -1665,7 +1682,7 @@ int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *bl // ASSERT(blockCol->flag != HAS_NONE); SColDataCompressInfo info = { - .cmprAlg = hdr->cmprAlg, + .cmprAlg = blockCol->alg, .columnFlag = blockCol->cflag, .flag = blockCol->flag, .dataType = blockCol->type, @@ -1759,4 +1776,14 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, _exit: return code; +} + +int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) { + if (set == NULL) return -1; + + uint32_t *ret = taosHashGet(set, &colId, sizeof(colId)); + if (ret == NULL) return -1; + + *alg = *ret; + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index d8e93cbe7c..6df0528f1c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -475,4 +475,4 @@ int32_t tsdbUpdateSkmRow(STsdb *pTsdb, const TABLEID *tbid, int32_t sver, SSkmIn tDestroyTSchema(pSkmRow->pTSchema); return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, tbid->suid, tbid->uid, sver, &pSkmRow->pTSchema); } -int32_t tsdbUpdateColCmprObj(STsdb *pTsdb, const TABLEID *tbid, SHashObj **ppColCmpr) { return 0; } \ No newline at end of file +int32_t tsdbUpdateColCmprObj(STsdb *pTsdb, const TABLEID *tbid, SHashObj **ppColCmpr) { return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.h b/source/dnode/vnode/src/tsdb/tsdbUtil2.h index 71f47a5f8e..e140194f9a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.h +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.h @@ -169,7 +169,7 @@ typedef struct { int64_t maxVer; int32_t numRec; int32_t size[15]; - int8_t cmprAlg; + uint32_t cmprAlg; int8_t numOfPKs; // number of primary keys int8_t rsvd[6]; } SBrinBlk; diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 8b5f268f81..e10b4f7938 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -2395,6 +2395,316 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } } +#define DEFINE_VAR(cmprAlg) \ + uint8_t l1 = COMPRESS_L1_TYPE_U32(cmprAlg); \ + uint8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg); \ + uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg); +/************************************************************************* + * REGULAR COMPRESSION 2 + *************************************************************************/ +// Timestamp ===================================================== +int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf) { + DEFINE_VAR(cmprAlg) + + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + return tsCompressTimestampImp(pIn, nEle, pOut); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + int32_t len = tsCompressTimestampImp(pIn, nEle, pBuf); + return tsCompressStringImp(pBuf, len, pOut, nOut); + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { + ASSERT(0); + } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { + ASSERT(0); + } + return -1; +} + +int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf) { + DEFINE_VAR(cmprAlg) + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + return tsDecompressTimestampImp(pIn, nEle, pOut); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + int32_t len = tsDecompressStringImp(pIn, nIn, pBuf, nBuf); + if (len < 0) return -1; + return tsDecompressTimestampImp(pBuf, nEle, pOut); + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { + ASSERT(0); + } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { + ASSERT(0); + } + + return 0; +} + +// Float ===================================================== +int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + return 0; + // #ifdef TD_TSZ + // // lossy mode + // if (lossyFloat) { + // return tsCompressFloatLossyImp(pIn, nEle, pOut); + // // lossless mode + // } else { + // #endif + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsCompressFloatImp(pIn, nEle, pOut); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // int32_t len = tsCompressFloatImp(pIn, nEle, pBuf); + // return tsCompressStringImp(pBuf, len, pOut, nOut); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } + // #ifdef TD_TSZ + // } + // #endif +} + +int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + return 0; + // #ifdef TD_TSZ + // if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) { + // // decompress lossy + // return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut); + // } else { + // #endif + // // decompress lossless + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsDecompressFloatImp(pIn, nEle, pOut); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + // return tsDecompressFloatImp(pBuf, nEle, pOut); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } + // #ifdef TD_TSZ + // } + // #endif +} + +// Double ===================================================== +int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + return 0; + // #ifdef TD_TSZ + // if (lossyDouble) { + // // lossy mode + // return tsCompressDoubleLossyImp(pIn, nEle, pOut); + // } else { + // #endif + // // lossless mode + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsCompressDoubleImp(pIn, nEle, pOut); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // int32_t len = tsCompressDoubleImp(pIn, nEle, pBuf); + // return tsCompressStringImp(pBuf, len, pOut, nOut); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } + // #ifdef TD_TSZ + // } + // #endif +} + +int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf) { + return 0; + // #ifdef TD_TSZ + // if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) { + // // decompress lossy + // return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut); + // } else { + // #endif + // // decompress lossless + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsDecompressDoubleImp(pIn, nEle, pOut); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + // return tsDecompressDoubleImp(pBuf, nEle, pOut); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } + // #ifdef TD_TSZ + // } + // #endif +} + +// Binary ===================================================== +int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + return tsCompressStringImp(pIn, nIn, pOut, nOut); +} + +int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf) { + // return 0; + return tsDecompressStringImp(pIn, nIn, pOut, nOut); +} + +// Bool ===================================================== +int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + DEFINE_VAR(cmprAlg) + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + return tsCompressBoolImp(pIn, nEle, pOut); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + int32_t len = tsCompressBoolImp(pIn, nIn, pBuf); + if (len < 0) return -1; + return tsCompressStringImp(pBuf, len, pOut, nOut); + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { + ASSERT(0); + } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { + ASSERT(0); + } + return -1; +} + +int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + DEFINE_VAR(cmprAlg) + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + return tsDecompressBoolImp(pIn, nEle, pOut); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + return tsDecompressBoolImp(pBuf, nEle, pOut); + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { + return -1; + } else if (l2 == L1_DISABLED && l2 == L2_DISABLED) { + return -1; + } + return -1; +} + +// Tinyint ===================================================== +int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + return 0; + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_TINYINT); + // return tsCompressStringImp(pBuf, len, pOut, nOut); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } +} + +int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf) { + return 0; + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + // return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_TINYINT); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } +} + +// Smallint ===================================================== +int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf) { + return 0; + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_SMALLINT); + // return tsCompressStringImp(pBuf, len, pOut, nOut); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } +} + +int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf) { + return 0; + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + // return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } +} + +// Int ===================================================== +int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + DEFINE_VAR(cmprAlg) + + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_INT); + return tsCompressStringImp(pBuf, len, pOut, nOut); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + ASSERTS(0, "compress algo invalid"); + return -1; + } else if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + ASSERTS(0, "compress algo invalid"); + return -1; + } + return -1; +} + +int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + DEFINE_VAR(cmprAlg) + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_INT); + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { + ASSERTS(0, "compress algo invalid"); + return -1; + } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { + return -1; + } + return -1; +} + +// Bigint ===================================================== +int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, + int32_t nBuf) { + return 0; + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT); + // return tsCompressStringImp(pBuf, len, pOut, nOut); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } +} + +int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf) { + return 0; + // if (cmprAlg == ONE_STAGE_COMP) { + // return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); + // } else if (cmprAlg == TWO_STAGE_COMP) { + // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + // return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT); + // } else { + // ASSERTS(0, "compress algo invalid"); + // return -1; + // } +} int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn); int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,