From efacafde33d065c9942ad58d02d100b4d03f1afa Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 26 Sep 2022 10:47:24 +0800 Subject: [PATCH] refact more code --- include/util/tcompression.h | 4 +- source/dnode/vnode/src/tsdb/tsdbDiskData.c | 20 +- source/util/src/tcompression.c | 316 ++++++++++++++------- 3 files changed, 219 insertions(+), 121 deletions(-) diff --git a/include/util/tcompression.h b/include/util/tcompression.h index f22205d144..8f3d9634ec 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -130,8 +130,8 @@ typedef struct SCompressor SCompressor; int32_t tCompressorCreate(SCompressor **ppCmprsor); int32_t tCompressorDestroy(SCompressor *pCmprsor); -int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); +int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); +int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index 2c1d6d2149..9e7e4cefe0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -65,12 +65,12 @@ static int32_t tDiskColBuilderInit(SDiskColBuilder *pBuilder, int16_t cid, int8_ if (IS_VAR_DATA_TYPE(type)) { if (pBuilder->pOffC == NULL && (code = tCompressorCreate(&pBuilder->pOffC))) return code; - code = tCompressorInit(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg); + code = tCompressStart(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg); if (code) return code; } if (pBuilder->pValC == NULL && (code = tCompressorCreate(&pBuilder->pValC))) return code; - code = tCompressorInit(pBuilder->pValC, type, cmprAlg); + code = tCompressStart(pBuilder->pValC, type, cmprAlg); if (code) return code; if (pBuilder->calcSma) { @@ -116,13 +116,13 @@ static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) { // OFFSET if (IS_VAR_DATA_TYPE(pBuilder->type)) { - code = tCompGen(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset); + code = tCompressEnd(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset); if (code) return code; } // VALUE if (pBuilder->flag != (HAS_NULL | HAS_NONE)) { - code = tCompGen(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue); + code = tCompressEnd(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue); if (code) return code; } @@ -457,15 +457,15 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB pBuilder->calcSma = calcSma; if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code; - code = tCompressorInit(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); + code = tCompressStart(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); if (code) return code; if (pBuilder->pVerC == NULL && (code = tCompressorCreate(&pBuilder->pVerC))) return code; - code = tCompressorInit(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg); + code = tCompressStart(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg); if (code) return code; if (pBuilder->pKeyC == NULL && (code = tCompressorCreate(&pBuilder->pKeyC))) return code; - code = tCompressorInit(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); + code = tCompressStart(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); if (code) return code; if (pBuilder->aBuilder == NULL) { @@ -586,16 +586,16 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) { // UID if (pBuilder->uid == 0) { - code = tCompGen(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid); + code = tCompressEnd(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid); if (code) return code; } // VERSION - code = tCompGen(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer); + code = tCompressEnd(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer); if (code) return code; // TSKEY - code = tCompGen(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey); + code = tCompressEnd(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey); if (code) return code; int32_t offset = 0; diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 3a6dbd4acc..f1d9b860b8 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1001,49 +1001,157 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co *************************************************************************/ #define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) -static int32_t tCompBoolInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); +static int32_t tCompBoolStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompIntInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); +static int32_t tCompBoolEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); + +static int32_t tCompIntStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompFloatInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); +static int32_t tCompIntEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); + +static int32_t tCompFloatStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompDoubleInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); +static int32_t tCompFloatEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); + +static int32_t tCompDoubleStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompTimestampInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); +static int32_t tCompDoubleEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); + +static int32_t tCompTimestampStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompBinaryInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); +static int32_t tCompTimestampEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); + +static int32_t tCompBinaryStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); static struct { int8_t type; int32_t bytes; int8_t isVarLen; - int32_t (*initFn)(SCompressor *, int8_t type, int8_t cmprAlg); + int32_t (*startFn)(SCompressor *, int8_t type, int8_t cmprAlg); int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData); + int32_t (*endFn)(SCompressor *, const uint8_t **, int32_t *); } DATA_TYPE_INFO[] = { - {.type = TSDB_DATA_TYPE_NULL, .bytes = 0, .isVarLen = 0, .initFn = NULL, .cmprFn = NULL}, // TSDB_DATA_TYPE_NULL - {.type = TSDB_DATA_TYPE_BOOL, .bytes = 1, .isVarLen = 0, .initFn = tCompBoolInit, .cmprFn = tCompBool}, - {.type = TSDB_DATA_TYPE_TINYINT, .bytes = 1, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, - {.type = TSDB_DATA_TYPE_SMALLINT, .bytes = 2, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, - {.type = TSDB_DATA_TYPE_INT, .bytes = 4, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, - {.type = TSDB_DATA_TYPE_BIGINT, .bytes = 8, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, - {.type = TSDB_DATA_TYPE_FLOAT, .bytes = 4, .isVarLen = 0, .initFn = tCompFloatInit, .cmprFn = tCompFloat}, - {.type = TSDB_DATA_TYPE_DOUBLE, .bytes = 8, .isVarLen = 0, .initFn = tCompDoubleInit, .cmprFn = tCompDouble}, - {.type = TSDB_DATA_TYPE_VARCHAR, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, + {.type = TSDB_DATA_TYPE_NULL, + .bytes = 0, + .isVarLen = 0, + .startFn = NULL, + .cmprFn = NULL, + .endFn = NULL}, // TSDB_DATA_TYPE_NULL + {.type = TSDB_DATA_TYPE_BOOL, + .bytes = 1, + .isVarLen = 0, + .startFn = tCompBoolStart, + .cmprFn = tCompBool, + .endFn = tCompBoolEnd}, + {.type = TSDB_DATA_TYPE_TINYINT, + .bytes = 1, + .isVarLen = 0, + .startFn = tCompIntStart, + .cmprFn = tCompInt, + .endFn = tCompIntEnd}, + {.type = TSDB_DATA_TYPE_SMALLINT, + .bytes = 2, + .isVarLen = 0, + .startFn = tCompIntStart, + .cmprFn = tCompInt, + .endFn = tCompIntEnd}, + {.type = TSDB_DATA_TYPE_INT, + .bytes = 4, + .isVarLen = 0, + .startFn = tCompIntStart, + .cmprFn = tCompInt, + .endFn = tCompIntEnd}, + {.type = TSDB_DATA_TYPE_BIGINT, + .bytes = 8, + .isVarLen = 0, + .startFn = tCompIntStart, + .cmprFn = tCompInt, + .endFn = tCompIntEnd}, + {.type = TSDB_DATA_TYPE_FLOAT, + .bytes = 4, + .isVarLen = 0, + .startFn = tCompFloatStart, + .cmprFn = tCompFloat, + .endFn = tCompFloatEnd}, + {.type = TSDB_DATA_TYPE_DOUBLE, + .bytes = 8, + .isVarLen = 0, + .startFn = tCompDoubleStart, + .cmprFn = tCompDouble, + .endFn = tCompDoubleEnd}, + {.type = TSDB_DATA_TYPE_VARCHAR, + .bytes = 1, + .isVarLen = 1, + .startFn = tCompBinaryStart, + .cmprFn = tCompBinary, + .endFn = tCompBinaryEnd}, {.type = TSDB_DATA_TYPE_TIMESTAMP, .bytes = 8, .isVarLen = 0, - .initFn = tCompTimestampInit, - .cmprFn = tCompTimestamp}, - {.type = TSDB_DATA_TYPE_NCHAR, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, - {.type = TSDB_DATA_TYPE_UTINYINT, .bytes = 1, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, - {.type = TSDB_DATA_TYPE_USMALLINT, .bytes = 2, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, - {.type = TSDB_DATA_TYPE_UINT, .bytes = 4, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, - {.type = TSDB_DATA_TYPE_UBIGINT, .bytes = 8, .isVarLen = 0, .initFn = tCompIntInit, .cmprFn = tCompInt}, - {.type = TSDB_DATA_TYPE_JSON, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, - {.type = TSDB_DATA_TYPE_VARBINARY, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, - {.type = TSDB_DATA_TYPE_DECIMAL, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, - {.type = TSDB_DATA_TYPE_BLOB, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, - {.type = TSDB_DATA_TYPE_MEDIUMBLOB, .bytes = 1, .isVarLen = 1, .initFn = tCompBinaryInit, .cmprFn = tCompBinary}, + .startFn = tCompTimestampStart, + .cmprFn = tCompTimestamp, + .endFn = tCompTimestampEnd}, + {.type = TSDB_DATA_TYPE_NCHAR, + .bytes = 1, + .isVarLen = 1, + .startFn = tCompBinaryStart, + .cmprFn = tCompBinary, + .endFn = tCompBinaryEnd}, + {.type = TSDB_DATA_TYPE_UTINYINT, + .bytes = 1, + .isVarLen = 0, + .startFn = tCompIntStart, + .cmprFn = tCompInt, + .endFn = tCompIntEnd}, + {.type = TSDB_DATA_TYPE_USMALLINT, + .bytes = 2, + .isVarLen = 0, + .startFn = tCompIntStart, + .cmprFn = tCompInt, + .endFn = tCompIntEnd}, + {.type = TSDB_DATA_TYPE_UINT, + .bytes = 4, + .isVarLen = 0, + .startFn = tCompIntStart, + .cmprFn = tCompInt, + .endFn = tCompIntEnd}, + {.type = TSDB_DATA_TYPE_UBIGINT, + .bytes = 8, + .isVarLen = 0, + .startFn = tCompIntStart, + .cmprFn = tCompInt, + .endFn = tCompIntEnd}, + {.type = TSDB_DATA_TYPE_JSON, + .bytes = 1, + .isVarLen = 1, + .startFn = tCompBinaryStart, + .cmprFn = tCompBinary, + .endFn = tCompBinaryEnd}, + {.type = TSDB_DATA_TYPE_VARBINARY, + .bytes = 1, + .isVarLen = 1, + .startFn = tCompBinaryStart, + .cmprFn = tCompBinary, + .endFn = tCompBinaryEnd}, + {.type = TSDB_DATA_TYPE_DECIMAL, + .bytes = 1, + .isVarLen = 1, + .startFn = tCompBinaryStart, + .cmprFn = tCompBinary, + .endFn = tCompBinaryEnd}, + {.type = TSDB_DATA_TYPE_BLOB, + .bytes = 1, + .isVarLen = 1, + .startFn = tCompBinaryStart, + .cmprFn = tCompBinary, + .endFn = tCompBinaryEnd}, + {.type = TSDB_DATA_TYPE_MEDIUMBLOB, + .bytes = 1, + .isVarLen = 1, + .startFn = tCompBinaryStart, + .cmprFn = tCompBinary, + .endFn = tCompBinaryEnd}, }; struct SCompressor { @@ -1083,9 +1191,13 @@ struct SCompressor { }; // Timestamp ===================================================== -static int32_t tCompTimestampInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +static int32_t tCompTimestampStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; - // TODO + pCmprsor->ts_prev_val = 0; + pCmprsor->ts_prev_delta = 0; + pCmprsor->ts_flag_p = NULL; + pCmprsor->aBuf[0][0] = 1; + pCmprsor->nBuf[0] = 1; return code; } @@ -1215,6 +1327,12 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t return code; } +static int32_t tCompTimestampEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { + int32_t code = 0; + // TODO + return code; +} + // Integer ===================================================== #define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) static const uint8_t BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; @@ -1224,9 +1342,14 @@ static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; -static int32_t tCompIntInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +static int32_t tCompIntStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; - // TODO + pCmprsor->i_prev = 0; + pCmprsor->i_selector = 0; + pCmprsor->i_start = 0; + pCmprsor->i_end = 0; + pCmprsor->aBuf[0][0] = 0; + pCmprsor->nBuf[0] = 1; return code; } @@ -1336,13 +1459,22 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) return code; } -// Float ===================================================== -static int32_t tCompFloatInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +static int32_t tCompIntEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; // TODO return code; } +// Float ===================================================== +static int32_t tCompFloatStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { + int32_t code = 0; + pCmprsor->f_prev = 0; + pCmprsor->f_flag_p = NULL; + pCmprsor->aBuf[0][0] = 0; + pCmprsor->nBuf[0] = 1; + return code; +} + static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; @@ -1405,13 +1537,22 @@ static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nDat return code; } -// Double ===================================================== -static int32_t tCompDoubleInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +static int32_t tCompFloatEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; // TODO return code; } +// Double ===================================================== +static int32_t tCompDoubleStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { + int32_t code = 0; + pCmprsor->d_prev = 0; + pCmprsor->d_flag_p = NULL; + pCmprsor->aBuf[0][0] = 0; + pCmprsor->nBuf[0] = 1; + return code; +} + static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; @@ -1474,13 +1615,19 @@ static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nDa return code; } -// Binary ===================================================== -static int32_t tCompBinaryInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +static int32_t tCompDoubleEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; // TODO return code; } +// Binary ===================================================== +static int32_t tCompBinaryStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { + int32_t code = 0; + pCmprsor->nBuf[0] = 0; + return code; +} + static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; @@ -1498,12 +1645,18 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nDa return code; } +static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { + int32_t code = 0; + // TODO + return code; +} + // Bool ===================================================== static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; -static int32_t tCompBoolInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +static int32_t tCompBoolStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; - // TODO + pCmprsor->nBuf[0] = 0; return code; } @@ -1531,6 +1684,12 @@ static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData return code; } +static int32_t tCompBoolEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { + int32_t code = 0; + // TODO + return code; +} + // SCompressor ===================================================== int32_t tCompressorCreate(SCompressor **ppCmprsor) { int32_t code = 0; @@ -1565,7 +1724,7 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) { return code; } -int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; pCmprsor->type = type; @@ -1573,82 +1732,21 @@ int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { pCmprsor->autoAlloc = 1; pCmprsor->nVal = 0; - switch (type) { - case TSDB_DATA_TYPE_TIMESTAMP: - pCmprsor->ts_prev_val = 0; - pCmprsor->ts_prev_delta = 0; - pCmprsor->ts_flag_p = NULL; - pCmprsor->aBuf[0][0] = 1; // For timestamp, 1 means compressed, 0 otherwise - pCmprsor->nBuf[0] = 1; - break; - case TSDB_DATA_TYPE_BOOL: - pCmprsor->nBuf[0] = 0; - break; - case TSDB_DATA_TYPE_BINARY: - pCmprsor->nBuf[0] = 0; - break; - case TSDB_DATA_TYPE_FLOAT: - pCmprsor->f_prev = 0; - pCmprsor->f_flag_p = NULL; - pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) - pCmprsor->nBuf[0] = 1; - break; - case TSDB_DATA_TYPE_DOUBLE: - pCmprsor->d_prev = 0; - pCmprsor->d_flag_p = NULL; - pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) - pCmprsor->nBuf[0] = 1; - break; - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_UBIGINT: - pCmprsor->i_prev = 0; - pCmprsor->i_selector = 0; - pCmprsor->i_start = 0; - pCmprsor->i_end = 0; - pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) - pCmprsor->nBuf[0] = 1; - break; - default: - break; + if (DATA_TYPE_INFO[type].startFn) { + DATA_TYPE_INFO[type].startFn(pCmprsor, type, cmprAlg); } return code; } -int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { +int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; - if (pCmprsor->nVal == 0) { - *ppData = NULL; - *nData = 0; - return code; - } + *ppData = NULL; + *nData = 0; - if (pCmprsor->cmprAlg == TWO_STAGE_COMP /*|| IS_VAR_DATA_TYPE(pCmprsor->type)*/) { - code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1); - if (code) return code; - - int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]); - if (ret) { - pCmprsor->aBuf[1][0] = 0; - pCmprsor->nBuf[1] = ret + 1; - } else { - pCmprsor->aBuf[1][0] = 1; - memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]); - pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1; - } - - *ppData = pCmprsor->aBuf[1]; - *nData = pCmprsor->nBuf[1]; - } else { - *ppData = pCmprsor->aBuf[0]; - *nData = pCmprsor->nBuf[0]; + if (DATA_TYPE_INFO[pCmprsor->type].endFn) { + return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppData, nData); } return code;