fix invalid compress/decompress

This commit is contained in:
yihaoDeng 2024-03-29 13:48:58 +00:00
parent b7067e61c8
commit d7cee5dd80
3 changed files with 38 additions and 29 deletions

View File

@ -97,12 +97,12 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111
// SValueColumn ================================ // SValueColumn ================================
typedef struct { typedef struct {
uint32_t cmprAlg; // filled by caller int8_t cmprAlg; // filled by caller
int8_t type; int8_t type;
int32_t dataOriginalSize; int32_t dataOriginalSize;
int32_t dataCompressedSize; int32_t dataCompressedSize;
int32_t offsetOriginalSize; int32_t offsetOriginalSize;
int32_t offsetCompressedSize; int32_t offsetCompressedSize;
} SValueColumnCompressInfo; } SValueColumnCompressInfo;
int32_t tValueColumnInit(SValueColumn *valCol); int32_t tValueColumnInit(SValueColumn *valCol);
@ -150,17 +150,17 @@ int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, voi
// SColData ================================ // SColData ================================
typedef struct { typedef struct {
uint32_t cmprAlg; // filled by caller uint32_t cmprAlg; // filled by caller
int8_t columnFlag; int8_t columnFlag;
int8_t flag; int8_t flag;
int8_t dataType; int8_t dataType;
int16_t columnId; int16_t columnId;
int32_t numOfData; int32_t numOfData;
int32_t bitmapOriginalSize; int32_t bitmapOriginalSize;
int32_t bitmapCompressedSize; int32_t bitmapCompressedSize;
int32_t offsetOriginalSize; int32_t offsetOriginalSize;
int32_t offsetCompressedSize; int32_t offsetCompressedSize;
int32_t dataOriginalSize; int32_t dataOriginalSize;
int32_t dataCompressedSize; int32_t dataCompressedSize;
} SColDataCompressInfo; } SColDataCompressInfo;
typedef void *(*xMallocFn)(void *, int32_t); typedef void *(*xMallocFn)(void *, int32_t);

View File

@ -4185,7 +4185,7 @@ int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *info, SBu
uint8_t fmtVer = 0; uint8_t fmtVer = 0;
if ((code = tBufferPutU8(buffer, fmtVer))) return code; if ((code = tBufferPutU8(buffer, fmtVer))) return code;
if ((code = tBufferPutU32(buffer, info->cmprAlg))) return code; if ((code = tBufferPutI8(buffer, info->cmprAlg))) return code;
if ((code = tBufferPutI8(buffer, info->type))) return code; if ((code = tBufferPutI8(buffer, info->type))) return code;
if (IS_VAR_DATA_TYPE(info->type)) { if (IS_VAR_DATA_TYPE(info->type)) {
if ((code = tBufferPutI32v(buffer, info->offsetOriginalSize))) return code; if ((code = tBufferPutI32v(buffer, info->offsetOriginalSize))) return code;
@ -4203,7 +4203,7 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre
if ((code = tBufferGetU8(reader, &fmtVer))) return code; if ((code = tBufferGetU8(reader, &fmtVer))) return code;
if (fmtVer == 0) { if (fmtVer == 0) {
if ((code = tBufferGetU32(reader, &info->cmprAlg))) return code; if ((code = tBufferGetI8(reader, &info->cmprAlg))) return code;
if ((code = tBufferGetI8(reader, &info->type))) return code; if ((code = tBufferGetI8(reader, &info->type))) return code;
if (IS_VAR_DATA_TYPE(info->type)) { if (IS_VAR_DATA_TYPE(info->type)) {
if ((code = tBufferGetI32v(reader, &info->offsetOriginalSize))) return code; if ((code = tBufferGetI32v(reader, &info->offsetOriginalSize))) return code;
@ -4233,9 +4233,9 @@ int32_t tCompressData(void *input, // input
extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? info->originalSize : info->originalSize + COMP_OVERFLOW_BYTES; extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? info->originalSize : info->originalSize + COMP_OVERFLOW_BYTES;
ASSERT(outputSize >= extraSizeNeeded); ASSERT(outputSize >= extraSizeNeeded);
DEFINE_VAR(info->cmprAlg) // DEFINE_VAR(info->cmprAlg)
if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || if (info->cmprAlg == NO_COMPRESSION/* || (l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) ||
(l1 == L1_DISABLED && l2 == L2_DISABLED)) { (l1 == L1_DISABLED && l2 == L2_DISABLED)*/) {
memcpy(output, input, info->originalSize); memcpy(output, input, info->originalSize);
info->compressedSize = info->originalSize; info->compressedSize = info->originalSize;
} else if (info->cmprAlg == TWO_STAGE_COMP) { } else if (info->cmprAlg == TWO_STAGE_COMP) {
@ -4271,6 +4271,12 @@ int32_t tCompressData(void *input, // input
tBufferDestroy(&local); tBufferDestroy(&local);
} else { } else {
DEFINE_VAR(info->cmprAlg)
if ((l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || (l1 == L1_DISABLED && l2 == L2_DISABLED)) {
memcpy(output, input, info->originalSize);
info->compressedSize = info->originalSize;
return 0;
}
SBuffer local; SBuffer local;
tBufferInit(&local); tBufferInit(&local);
@ -4311,9 +4317,7 @@ int32_t tDecompressData(void *input, // input
ASSERT(outputSize >= info->originalSize); ASSERT(outputSize >= info->originalSize);
DEFINE_VAR(info->cmprAlg); if (info->cmprAlg == NO_COMPRESSION) {
if (info->cmprAlg == NO_COMPRESSION || (l1 == L1_DISABLED && l2 == L2_DISABLED)) {
ASSERT(info->compressedSize == info->originalSize); ASSERT(info->compressedSize == info->originalSize);
memcpy(output, input, info->compressedSize); memcpy(output, input, info->compressedSize);
} else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) { } else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) {
@ -4350,6 +4354,11 @@ int32_t tDecompressData(void *input, // input
ASSERT(decompressedSize == info->originalSize); ASSERT(decompressedSize == info->originalSize);
tBufferDestroy(&local); tBufferDestroy(&local);
} else { } else {
DEFINE_VAR(info->cmprAlg);
if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
memcpy(output, input, info->compressedSize);
return 0;
}
SBuffer local; SBuffer local;
tBufferInit(&local); tBufferInit(&local);

View File

@ -206,7 +206,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
for (int32_t i = 0; i < 10; i++) { // int64_t for (int32_t i = 0; i < 10; i++) { // int64_t
SCompressInfo cinfo = { SCompressInfo cinfo = {
.cmprAlg = 0, .cmprAlg = brinBlk->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT, .dataType = TSDB_DATA_TYPE_BIGINT,
.compressedSize = brinBlk->size[i], .compressedSize = brinBlk->size[i],
.originalSize = brinBlk->numRec * sizeof(int64_t), .originalSize = brinBlk->numRec * sizeof(int64_t),
@ -218,7 +218,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
for (int32_t i = 10; i < 15; i++) { // int32_t for (int32_t i = 10; i < 15; i++) { // int32_t
SCompressInfo cinfo = { SCompressInfo cinfo = {
.cmprAlg = 0, .cmprAlg = brinBlk->cmprAlg,
.dataType = TSDB_DATA_TYPE_INT, .dataType = TSDB_DATA_TYPE_INT,
.compressedSize = brinBlk->size[i], .compressedSize = brinBlk->size[i],
.originalSize = brinBlk->numRec * sizeof(int32_t), .originalSize = brinBlk->numRec * sizeof(int32_t),
@ -757,7 +757,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmpr
// write to file // write to file
for (int32_t i = 0; i < 10; ++i) { for (int32_t i = 0; i < 10; ++i) {
SCompressInfo info = { SCompressInfo info = {
.cmprAlg = 0, .cmprAlg = cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT, .dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = brinBlock->buffers[i].size, .originalSize = brinBlock->buffers[i].size,
}; };
@ -773,7 +773,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmpr
} }
for (int32_t i = 10; i < 15; ++i) { for (int32_t i = 10; i < 15; ++i) {
SCompressInfo info = { SCompressInfo info = {
.cmprAlg = 0, .cmprAlg = cmprAlg,
.dataType = TSDB_DATA_TYPE_INT, .dataType = TSDB_DATA_TYPE_INT,
.originalSize = brinBlock->buffers[i].size, .originalSize = brinBlock->buffers[i].size,
}; };