diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index f0e41acbb9..d16408dd55 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -97,12 +97,12 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111 // SValueColumn ================================ typedef struct { - uint32_t cmprAlg; // filled by caller - int8_t type; - int32_t dataOriginalSize; - int32_t dataCompressedSize; - int32_t offsetOriginalSize; - int32_t offsetCompressedSize; + int8_t cmprAlg; // filled by caller + int8_t type; + int32_t dataOriginalSize; + int32_t dataCompressedSize; + int32_t offsetOriginalSize; + int32_t offsetCompressedSize; } SValueColumnCompressInfo; int32_t tValueColumnInit(SValueColumn *valCol); @@ -150,17 +150,17 @@ int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, voi // SColData ================================ typedef struct { uint32_t cmprAlg; // filled by caller - int8_t columnFlag; - int8_t flag; - int8_t dataType; - int16_t columnId; - int32_t numOfData; - int32_t bitmapOriginalSize; - int32_t bitmapCompressedSize; - int32_t offsetOriginalSize; - int32_t offsetCompressedSize; - int32_t dataOriginalSize; - int32_t dataCompressedSize; + int8_t columnFlag; + int8_t flag; + int8_t dataType; + int16_t columnId; + int32_t numOfData; + int32_t bitmapOriginalSize; + int32_t bitmapCompressedSize; + int32_t offsetOriginalSize; + int32_t offsetCompressedSize; + int32_t dataOriginalSize; + int32_t dataCompressedSize; } SColDataCompressInfo; typedef void *(*xMallocFn)(void *, int32_t); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 357627b907..6b4a4d8479 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -4185,7 +4185,7 @@ int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *info, SBu uint8_t fmtVer = 0; if ((code = tBufferPutU8(buffer, fmtVer))) return code; - if ((code = tBufferPutU32(buffer, info->cmprAlg))) return code; + if ((code = tBufferPutI8(buffer, info->cmprAlg))) return code; if ((code = tBufferPutI8(buffer, info->type))) return code; if (IS_VAR_DATA_TYPE(info->type)) { if ((code = tBufferPutI32v(buffer, info->offsetOriginalSize))) return code; @@ -4203,7 +4203,7 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre if ((code = tBufferGetU8(reader, &fmtVer))) return code; if (fmtVer == 0) { - if ((code = tBufferGetU32(reader, &info->cmprAlg))) return code; + if ((code = tBufferGetI8(reader, &info->cmprAlg))) return code; if ((code = tBufferGetI8(reader, &info->type))) return code; if (IS_VAR_DATA_TYPE(info->type)) { if ((code = tBufferGetI32v(reader, &info->offsetOriginalSize))) return code; @@ -4233,9 +4233,9 @@ int32_t tCompressData(void *input, // input extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? info->originalSize : info->originalSize + COMP_OVERFLOW_BYTES; ASSERT(outputSize >= extraSizeNeeded); - DEFINE_VAR(info->cmprAlg) - if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || - (l1 == L1_DISABLED && l2 == L2_DISABLED)) { + // DEFINE_VAR(info->cmprAlg) + if (info->cmprAlg == NO_COMPRESSION/* || (l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || + (l1 == L1_DISABLED && l2 == L2_DISABLED)*/) { memcpy(output, input, info->originalSize); info->compressedSize = info->originalSize; } else if (info->cmprAlg == TWO_STAGE_COMP) { @@ -4271,6 +4271,12 @@ int32_t tCompressData(void *input, // input tBufferDestroy(&local); } else { + DEFINE_VAR(info->cmprAlg) + if ((l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || (l1 == L1_DISABLED && l2 == L2_DISABLED)) { + memcpy(output, input, info->originalSize); + info->compressedSize = info->originalSize; + return 0; + } SBuffer local; tBufferInit(&local); @@ -4311,9 +4317,7 @@ int32_t tDecompressData(void *input, // input ASSERT(outputSize >= info->originalSize); - DEFINE_VAR(info->cmprAlg); - - if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_DISABLED && l2 == L2_DISABLED)) { + if (info->cmprAlg == NO_COMPRESSION) { ASSERT(info->compressedSize == info->originalSize); memcpy(output, input, info->compressedSize); } else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) { @@ -4350,6 +4354,11 @@ int32_t tDecompressData(void *input, // input ASSERT(decompressedSize == info->originalSize); tBufferDestroy(&local); } else { + DEFINE_VAR(info->cmprAlg); + if (l1 == L1_DISABLED && l2 == L2_DISABLED) { + memcpy(output, input, info->compressedSize); + return 0; + } SBuffer local; tBufferInit(&local); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 5219b38e47..6a2a0dfb61 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -206,7 +206,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB for (int32_t i = 0; i < 10; i++) { // int64_t SCompressInfo cinfo = { - .cmprAlg = 0, + .cmprAlg = brinBlk->cmprAlg, .dataType = TSDB_DATA_TYPE_BIGINT, .compressedSize = brinBlk->size[i], .originalSize = brinBlk->numRec * sizeof(int64_t), @@ -218,7 +218,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB for (int32_t i = 10; i < 15; i++) { // int32_t SCompressInfo cinfo = { - .cmprAlg = 0, + .cmprAlg = brinBlk->cmprAlg, .dataType = TSDB_DATA_TYPE_INT, .compressedSize = brinBlk->size[i], .originalSize = brinBlk->numRec * sizeof(int32_t), @@ -757,7 +757,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmpr // write to file for (int32_t i = 0; i < 10; ++i) { SCompressInfo info = { - .cmprAlg = 0, + .cmprAlg = cmprAlg, .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = brinBlock->buffers[i].size, }; @@ -773,7 +773,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmpr } for (int32_t i = 10; i < 15; ++i) { SCompressInfo info = { - .cmprAlg = 0, + .cmprAlg = cmprAlg, .dataType = TSDB_DATA_TYPE_INT, .originalSize = brinBlock->buffers[i].size, };