diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 46b899372d..c0b086d401 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -163,15 +163,16 @@ typedef struct { typedef void *(*xMallocFn)(void *, int32_t); -void tColDataDestroy(void *ph); -void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t cflag); -void tColDataClear(SColData *pColData); -void tColDataDeepClear(SColData *pColData); -int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); -int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward); -void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); -uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); -int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); +void tColDataDestroy(void *ph); +void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t cflag); +void tColDataClear(SColData *pColData); +void tColDataDeepClear(SColData *pColData); +int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); +int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward); +void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); +uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); +int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); + extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull); int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer *output, SBuffer *assist); @@ -335,14 +336,13 @@ struct SValueColumn { }; typedef struct { - int8_t dataType; // fill by caller - int8_t cmprAlg; // fill by caller - int32_t originalSize; + int8_t dataType; // filled by caller + int8_t cmprAlg; // filled by caller + int32_t originalSize; // filled by caller int32_t compressedSize; } SCompressInfo; int32_t tCompressData(void *input, // input - int32_t inputSize, // input size SCompressInfo *info, // compress info void *output, // output int32_t outputSize, // output size @@ -355,8 +355,8 @@ int32_t tDecompressData(void *input, // input int32_t outputSize, // output size SBuffer *buffer // assistant buffer provided by caller, can be NULL ); -int32_t tCompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist); -int32_t tDecompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist); +int32_t tCompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist); +int32_t tDecompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist); #endif diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index c838730b00..c87bafa36e 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -101,9 +101,13 @@ typedef struct { int32_t kvRowSize; } SRowBuildScanInfo; -static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo) { sinfo->numOfNone++; } +static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + ASSERT((pTColumn->flags & COL_IS_KEY) == 0); + sinfo->numOfNone++; +} static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + ASSERT((pTColumn->flags & COL_IS_KEY) == 0); sinfo->numOfNull++; sinfo->kvMaxOffset = sinfo->kvPayloadSize; sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId); @@ -152,12 +156,9 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS // loop scan for (int32_t i = 1; i < schema->numOfCols; i++) { - bool isPK = ((schema->columns[i].flags & COL_IS_KEY) != 0); - for (;;) { if (colValIndex >= numOfColVals) { - ASSERTS(!isPK, "Primary key should not be NONE or NULL"); - tRowBuildScanAddNone(sinfo); + tRowBuildScanAddNone(sinfo, schema->columns + i); break; } @@ -167,18 +168,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i); } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { - ASSERTS(!isPK, "Primary key should not be NULL or NONE"); tRowBuildScanAddNull(sinfo, schema->columns + i); } else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) { - ASSERTS(!isPK, "Primary key should not be NULL or NONE"); - tRowBuildScanAddNone(sinfo); + tRowBuildScanAddNone(sinfo, schema->columns + i); } colValIndex++; break; } else if (colValArray[colValIndex].cid > schema->columns[i].colId) { - ASSERTS(!isPK, "Primary key should not be NONE or NULL"); - tRowBuildScanAddNone(sinfo); + tRowBuildScanAddNone(sinfo, schema->columns + i); break; } else { // skip useless value colValIndex++; @@ -259,7 +257,7 @@ static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo uint8_t *primaryKeys = (*ppRow)->data; uint8_t *bitmap = primaryKeys + sinfo->tuplePKSize; uint8_t *fixed = bitmap + sinfo->tupleBitmapSize; - uint8_t *varlen = fixed + schema->flen; + uint8_t *varlen = fixed + sinfo->tupleFixedSize; // primary keys for (int32_t i = 0; i < sinfo->numOfPKs; i++) { @@ -1139,10 +1137,9 @@ int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, in ASSERT(pRow->sver == pTSchema->version); ASSERT(nColData > 0); - uint8_t tflag = pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE); - if (tflag == HAS_NONE) { + if (pRow->flag == HAS_NONE) { return tRowNoneUpsertColData(aColData, nColData, flag); - } else if (tflag == HAS_NULL) { + } else if (pRow->flag == HAS_NULL) { return tRowNullUpsertColData(aColData, nColData, pTSchema, flag); } else if (pRow->flag >> 4) { // KV row return tRowKVUpsertColData(pRow, pTSchema, aColData, nColData, flag); @@ -2622,9 +2619,10 @@ int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer SCompressInfo cinfo = { .dataType = TSDB_DATA_TYPE_TINYINT, .cmprAlg = info->cmprAlg, + .originalSize = info->bitmapOriginalSize, }; - code = tCompressDataToBuffer(colData->pBitMap, info->bitmapOriginalSize, &cinfo, output, assist); + code = tCompressDataToBuffer(colData->pBitMap, &cinfo, output, assist); if (code) { tBufferDestroy(&local); return code; @@ -2645,9 +2643,10 @@ int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer SCompressInfo cinfo = { .dataType = TSDB_DATA_TYPE_INT, .cmprAlg = info->cmprAlg, + .originalSize = info->offsetOriginalSize, }; - code = tCompressDataToBuffer(colData->aOffset, info->offsetOriginalSize, &cinfo, output, assist); + code = tCompressDataToBuffer(colData->aOffset, &cinfo, output, assist); if (code) { tBufferDestroy(&local); return code; @@ -2663,9 +2662,10 @@ int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer SCompressInfo cinfo = { .dataType = colData->type, .cmprAlg = info->cmprAlg, + .originalSize = info->dataOriginalSize, }; - code = tCompressDataToBuffer(colData->pData, info->dataOriginalSize, &cinfo, output, assist); + code = tCompressDataToBuffer(colData->pData, &cinfo, output, assist); if (code) { tBufferDestroy(&local); return code; @@ -4046,9 +4046,10 @@ int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *inf SCompressInfo cinfo = { .cmprAlg = info->cmprAlg, .dataType = TSDB_DATA_TYPE_INT, + .originalSize = valCol->offsets.size, }; - code = tCompressDataToBuffer(valCol->offsets.data, valCol->offsets.size, &cinfo, output, assist); + code = tCompressDataToBuffer(valCol->offsets.data, &cinfo, output, assist); if (code) return code; info->offsetOriginalSize = cinfo.originalSize; @@ -4059,9 +4060,10 @@ int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *inf SCompressInfo cinfo = { .cmprAlg = info->cmprAlg, .dataType = valCol->type, + .originalSize = valCol->data.size, }; - code = tCompressDataToBuffer(valCol->data.data, valCol->data.size, &cinfo, output, assist); + code = tCompressDataToBuffer(valCol->data.data, &cinfo, output, assist); if (code) return code; info->dataOriginalSize = cinfo.originalSize; @@ -4182,7 +4184,6 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre } int32_t tCompressData(void *input, // input - int32_t inputSize, // input size SCompressInfo *info, // compress info void *output, // output int32_t outputSize, // output size @@ -4191,13 +4192,12 @@ int32_t tCompressData(void *input, // input int32_t extraSizeNeeded; int32_t code; - extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? inputSize : inputSize + COMP_OVERFLOW_BYTES; + extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? info->originalSize : info->originalSize + COMP_OVERFLOW_BYTES; ASSERT(outputSize >= extraSizeNeeded); - info->originalSize = inputSize; if (info->cmprAlg == NO_COMPRESSION) { - memcpy(output, input, inputSize); - info->compressedSize = inputSize; + memcpy(output, input, info->originalSize); + info->compressedSize = info->originalSize; } else { SBuffer local; @@ -4216,8 +4216,8 @@ int32_t tCompressData(void *input, // input info->compressedSize = tDataTypes[info->dataType].compFunc( // input, // input - inputSize, // input size - inputSize / tDataTypes[info->dataType].bytes, // number of elements + info->originalSize, // input size + info->originalSize / tDataTypes[info->dataType].bytes, // number of elements output, // output outputSize, // output size info->cmprAlg, // compression algorithm @@ -4287,26 +4287,27 @@ int32_t tDecompressData(void *input, // input return 0; } -int32_t tCompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist) { +int32_t tCompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist) { int32_t code; - code = tBufferEnsureCapacity(output, output->size + inputSize + COMP_OVERFLOW_BYTES); + code = tBufferEnsureCapacity(output, output->size + info->originalSize + COMP_OVERFLOW_BYTES); if (code) return code; - code = tCompressData(input, inputSize, info, tBufferGetDataEnd(output), output->capacity - output->size, assist); + code = tCompressData(input, info, tBufferGetDataEnd(output), output->capacity - output->size, assist); if (code) return code; output->size += info->compressedSize; return 0; } -int32_t tDecompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist) { +int32_t tDecompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist) { int32_t code; code = tBufferEnsureCapacity(output, output->size + info->originalSize); if (code) return code; - code = tDecompressData(input, inputSize, info, tBufferGetDataEnd(output), output->capacity - output->size, assist); + code = tDecompressData(input, info->compressedSize, info, tBufferGetDataEnd(output), output->capacity - output->size, + assist); if (code) return code; output->size += info->originalSize; diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index c366be6904..9bdbfae968 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -207,8 +207,8 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB .compressedSize = brinBlk->size[i], .originalSize = brinBlk->numRec * sizeof(int64_t), }; - code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), brinBlk->size[i], &cinfo, - brinBlock->buffers + i, reader->buffers + 1); + code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), &cinfo, brinBlock->buffers + i, + reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); br.offset += brinBlk->size[i]; } @@ -220,8 +220,8 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB .compressedSize = brinBlk->size[i], .originalSize = brinBlk->numRec * sizeof(int32_t), }; - code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), brinBlk->size[i], &cinfo, - brinBlock->buffers + i, reader->buffers + 1); + code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), &cinfo, brinBlock->buffers + i, + reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); br.offset += brinBlk->size[i]; } @@ -490,7 +490,7 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB .originalSize = tombBlk->numRec * sizeof(int64_t), .compressedSize = tombBlk->size[i], }; - code = tDecompressDataToBuffer(BR_PTR(&br), cinfo.compressedSize, &cinfo, tData->buffers + i, reader->buffers + 1); + code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); br.offset += tombBlk->size[i]; } @@ -747,6 +747,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl SBuffer *bf = &brinBlock->buffers[i]; SCompressInfo info = { .cmprAlg = cmprAlg, + .originalSize = bf->size, }; if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) { @@ -758,7 +759,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl } tBufferClear(&buffers[0]); - code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffers[0].data, &buffers[1]); + code = tCompressDataToBuffer(tBufferGetData(bf), &info, buffers[0].data, &buffers[1]); if (code) return code; code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size); @@ -1250,7 +1251,7 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = tombBlock->buffers[i].size, }; - code = tCompressDataToBuffer(tombBlock->buffers[i].data, cinfo.originalSize, &cinfo, &buffers[0], &buffers[1]); + code = tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, &buffers[0], &buffers[1]); if (code) return code; code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size); diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 7e88a38bcd..2e29911683 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -310,8 +310,7 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk .originalSize = tombBlk->numRec * sizeof(int64_t), .compressedSize = tombBlk->size[i], }; - code = - tDecompressDataToBuffer(BR_PTR(&br), cinfo.compressedSize, &cinfo, tombBlock->buffers + i, reader->buffers + 1); + code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tombBlock->buffers + i, reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); br.offset += tombBlk->size[i]; } @@ -348,8 +347,8 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta .originalSize = statisBlk->numRec * sizeof(int64_t), }; - code = tDecompressDataToBuffer(tBufferGetDataAt(&reader->buffers[0], size), statisBlk->size[i], &info, - &statisBlock->buffers[i], &reader->buffers[1]); + code = tDecompressDataToBuffer(tBufferGetDataAt(&reader->buffers[0], size), &info, &statisBlock->buffers[i], + &reader->buffers[1]); TSDB_CHECK_CODE(code, lino, _exit); size += statisBlk->size[i]; } @@ -519,11 +518,12 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { SCompressInfo info = { .dataType = TSDB_DATA_TYPE_BIGINT, .cmprAlg = statisBlk.cmprAlg, + .originalSize = statisBlock->buffers[i].size, }; tBufferClear(&writer->buffers[0]); - code = tCompressDataToBuffer(tBufferGetData(&statisBlock->buffers[i]), tBufferGetSize(&statisBlock->buffers[i]), - &info, &writer->buffers[0], &writer->buffers[1]); + code = tCompressDataToBuffer(tBufferGetData(&statisBlock->buffers[i]), &info, &writer->buffers[0], + &writer->buffers[1]); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 75711db380..44d2fc1298 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1390,7 +1390,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = sizeof(int64_t) * bData->nRow, }; - code = tCompressDataToBuffer(bData->aUid, cinfo.originalSize, &cinfo, buffer, assist); + code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist); TSDB_CHECK_CODE(code, lino, _exit); hdr->szUid = cinfo.compressedSize; } @@ -1401,7 +1401,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = sizeof(int64_t) * bData->nRow, }; - code = tCompressDataToBuffer((uint8_t *)bData->aVersion, cinfo.originalSize, &cinfo, buffer, assist); + code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist); TSDB_CHECK_CODE(code, lino, _exit); hdr->szVer = cinfo.compressedSize; @@ -1411,7 +1411,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .dataType = TSDB_DATA_TYPE_TIMESTAMP, .originalSize = sizeof(TSKEY) * bData->nRow, }; - code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, cinfo.originalSize, &cinfo, buffer, assist); + code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist); TSDB_CHECK_CODE(code, lino, _exit); hdr->szKey = cinfo.compressedSize;