From 8884c1ca77edfa3962b8c3514fa4a1d88cbca365 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 27 Feb 2024 15:10:42 +0800 Subject: [PATCH] more code --- include/common/tdataformat.h | 23 +++- source/common/src/tdataformat.c | 206 +++++++++++++++++++++++++++----- 2 files changed, 195 insertions(+), 34 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index ffcb604191..9b582dc50b 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -147,6 +147,21 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remov int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, void *pMsgBuf); // SColData ================================ +typedef struct { + int8_t cmprAlg; + int8_t columnFlag; + int8_t flag; + int8_t dataType; + int16_t columnId; + int32_t numOfValues; + int32_t bitmapOriginalSize; + int32_t bitmapCompressedSize; + int32_t offsetOriginalSize; + int32_t offsetCompressedSize; + int32_t dataOriginalSize; + int32_t dataCompressedSize; +} SColDataCompressInfo; + typedef void *(*xMallocFn)(void *, int32_t); void tColDataDestroy(void *ph); void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t cflag); @@ -157,6 +172,10 @@ int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool for void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); +int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo *info, SBuffer *buffer, + SBuffer *helperBuffer); +int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData, + SBuffer *helperBuffer); extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull); // for stmt bind @@ -319,9 +338,9 @@ typedef struct { int32_t compressedSize; // fill by compress } SCompressInfo; -int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprInfo, SBuffer *buffer, +int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *info, SBuffer *buffer, SBuffer *helperBuffer); -int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *cmprInfo, SBuffer *buffer, +int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *buffer, SBuffer *helperBuffer); #endif diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index ac5b50c891..d96e402e81 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2635,6 +2635,148 @@ _exit: return code; } +int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo *info, SBuffer *buffer, + SBuffer *helperBuffer) { + int32_t code; + + ASSERT(colData->nVal > 0); + + (*info) = (SColDataCompressInfo){ + .cmprAlg = cmprAlg, + .columnFlag = colData->cflag, + .flag = colData->flag, + .dataType = colData->type, + .columnId = colData->cid, + .numOfValues = colData->nVal, + }; + + if (colData->flag == HAS_NONE || colData->flag == HAS_NULL) { + return 0; + } + + // bitmap + if (colData->flag != HAS_VALUE) { + if (colData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) { + info->bitmapOriginalSize = BIT2_SIZE(colData->nVal); + } else { + info->bitmapOriginalSize = BIT1_SIZE(colData->nVal); + } + + SCompressInfo cinfo = { + .dataType = TSDB_DATA_TYPE_TINYINT, + .cmprAlg = cmprAlg, + }; + + code = tCompressData(colData->pBitMap, info->bitmapOriginalSize, &cinfo, buffer, helperBuffer); + if (code) return code; + + info->bitmapCompressedSize = cinfo.compressedSize; + } + + if (colData->flag == (HAS_NONE | HAS_NULL)) { + return 0; + } + + // offset + if (IS_VAR_DATA_TYPE(colData->type)) { + info->offsetOriginalSize = sizeof(int32_t) * colData->nVal; + + SCompressInfo cinfo = { + .dataType = TSDB_DATA_TYPE_INT, + .cmprAlg = cmprAlg, + }; + + code = tCompressData(colData->aOffset, info->offsetOriginalSize, &cinfo, buffer, helperBuffer); + if (code) return code; + + info->offsetCompressedSize = cinfo.compressedSize; + } + + // data + if (colData->nData > 0) { + info->dataOriginalSize = colData->nData; + + SCompressInfo cinfo = { + .dataType = colData->type, + .cmprAlg = cmprAlg, + }; + + code = tCompressData(colData->pData, info->dataOriginalSize, &cinfo, buffer, helperBuffer); + if (code) return code; + + info->dataCompressedSize = cinfo.compressedSize; + } + + return 0; +} + +int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData, + SBuffer *helperBuffer) { + int32_t code; + int32_t size = 0; + + tColDataClear(colData); + + colData->cid = info->columnId; + colData->type = info->dataType; + colData->cflag = info->columnFlag; + colData->nVal = info->numOfValues; + colData->flag = info->flag; + + if (info->flag == HAS_NONE || info->flag == HAS_NULL) { + goto _exit; + } + + // bitmap + if (info->bitmapOriginalSize > 0) { + SCompressInfo cinfo = { + .dataType = TSDB_DATA_TYPE_TINYINT, + .cmprAlg = info->cmprAlg, + .originalSize = info->bitmapOriginalSize, + .compressedSize = info->bitmapCompressedSize, + }; + + code = tDecompressData((char *)input + size, info->bitmapCompressedSize, &cinfo, NULL /* TODO */, helperBuffer); + if (code) return code; + + size += info->bitmapCompressedSize; + } + + if (info->flag == (HAS_NONE | HAS_NULL)) { + goto _exit; + } + + // offset + if (info->offsetOriginalSize > 0) { + // TODO + } + + // data + if (info->dataOriginalSize > 0) { + // TODO + } + +_exit: + ASSERT(inputSize == size); + switch (colData->flag) { + case HAS_NONE: + colData->numOfNone = colData->nVal; + break; + case HAS_NULL: + colData->numOfNull = colData->nVal; + break; + case HAS_VALUE: + colData->numOfValue = colData->nVal; + break; + case (HAS_VALUE | HAS_NULL | HAS_NONE): + // TODO: loop to get the number of each type from bit2 map + break; + default: + // TODO: loop to get the number of each type from bit1 map + } + return 0; +} + int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, char *data) { int32_t code = 0; @@ -3979,12 +4121,12 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre return 0; } -int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprInfo, SBuffer *buffer, +int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *info, SBuffer *buffer, SBuffer *helperBuffer) { - cmprInfo->originalSize = inputSize; + info->originalSize = inputSize; - if (cmprInfo->cmprAlg == NO_COMPRESSION) { - cmprInfo->compressedSize = inputSize; + if (info->cmprAlg == NO_COMPRESSION) { + info->compressedSize = inputSize; return tBufferAppend(buffer, input, inputSize); } else { SBuffer hBuffer; @@ -3996,7 +4138,7 @@ int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprI int32_t code = tBufferEnsureCapacity(buffer, buffer->size + extraSizeNeeded); if (code) return code; - if (cmprInfo->cmprAlg == TWO_STAGE_COMP) { + if (info->cmprAlg == TWO_STAGE_COMP) { if (extraBuffer == NULL) { extraBuffer = &hBuffer; } @@ -4005,32 +4147,32 @@ int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprI if (code) return code; } - cmprInfo->compressedSize = tDataTypes[cmprInfo->dataType].compFunc( // - (void *)input, // input - inputSize, // input size - inputSize / tDataTypes[cmprInfo->dataType].bytes, // number of elements - tBufferGetDataEnd(buffer), // output - extraSizeNeeded, // output size - cmprInfo->cmprAlg, // compression algorithm - tBufferGetData(extraBuffer), // helper buffer - extraSizeNeeded // extra buffer size + info->compressedSize = tDataTypes[info->dataType].compFunc( // + (void *)input, // input + inputSize, // input size + inputSize / tDataTypes[info->dataType].bytes, // number of elements + tBufferGetDataEnd(buffer), // output + extraSizeNeeded, // output size + info->cmprAlg, // compression algorithm + tBufferGetData(extraBuffer), // helper buffer + extraSizeNeeded // extra buffer size ); - if (cmprInfo->compressedSize < 0) { + if (info->compressedSize < 0) { tBufferDestroy(&hBuffer); return TSDB_CODE_COMPRESS_ERROR; } - buffer->size += cmprInfo->compressedSize; + buffer->size += info->compressedSize; tBufferDestroy(&hBuffer); } return 0; } -int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *cmprInfo, SBuffer *buffer, +int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *buffer, SBuffer *helperBuffer) { - if (cmprInfo->cmprAlg == NO_COMPRESSION) { - ASSERT(inputSize == cmprInfo->originalSize); + if (info->cmprAlg == NO_COMPRESSION) { + ASSERT(inputSize == info->originalSize); return tBufferAppend(buffer, input, inputSize); } else { SBuffer hBuffer; @@ -4039,32 +4181,32 @@ int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInf tBufferInit(&hBuffer); - code = tBufferEnsureCapacity(buffer, cmprInfo->originalSize); + code = tBufferEnsureCapacity(buffer, info->originalSize); if (code) return code; - if (cmprInfo->cmprAlg == TWO_STAGE_COMP) { + if (info->cmprAlg == TWO_STAGE_COMP) { if (extraBuffer == NULL) { extraBuffer = &hBuffer; } - code = tBufferEnsureCapacity(extraBuffer, cmprInfo->originalSize + COMP_OVERFLOW_BYTES); + code = tBufferEnsureCapacity(extraBuffer, info->originalSize + COMP_OVERFLOW_BYTES); if (code) return code; } - int32_t decompressedSize = tDataTypes[cmprInfo->dataType].decompFunc( - (void *)input, // input - inputSize, // inputSize - cmprInfo->originalSize / tDataTypes[cmprInfo->dataType].bytes, // number of elements - tBufferGetDataEnd(buffer), // output - cmprInfo->originalSize, // output size - cmprInfo->cmprAlg, // compression algorithm - extraBuffer, // helper buffer - cmprInfo->originalSize + COMP_OVERFLOW_BYTES // extra buffer size + int32_t decompressedSize = tDataTypes[info->dataType].decompFunc( + (void *)input, // input + inputSize, // inputSize + info->originalSize / tDataTypes[info->dataType].bytes, // number of elements + tBufferGetDataEnd(buffer), // output + info->originalSize, // output size + info->cmprAlg, // compression algorithm + extraBuffer, // helper buffer + info->originalSize + COMP_OVERFLOW_BYTES // extra buffer size ); if (decompressedSize < 0) { tBufferDestroy(&hBuffer); return TSDB_CODE_COMPRESS_ERROR; } - ASSERT(decompressedSize == cmprInfo->originalSize); + ASSERT(decompressedSize == info->originalSize); buffer->size += decompressedSize; tBufferDestroy(&hBuffer); }