diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 9b582dc50b..6f400e2542 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -96,12 +96,12 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111 // SValueColumn ================================ typedef struct { - int8_t cmprAlg; // filled by caller - int8_t type; // filled by compress - int32_t originalDataSize; // filled by compress - int32_t compressedDataSize; // filled by compress - int32_t originalOffsetSize; // filled by compress - int32_t compressedOffsetSize; // filled by compress + int8_t cmprAlg; // filled by caller + int8_t type; + int32_t originalDataSize; + int32_t compressedDataSize; + int32_t originalOffsetSize; + int32_t compressedOffsetSize; } SValueColumnCompressInfo; int32_t tValueColumnInit(SValueColumn *valCol); @@ -109,10 +109,9 @@ int32_t tValueColumnDestroy(SValueColumn *valCol); int32_t tValueColumnClear(SValueColumn *valCol); int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value); int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value); -int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *compressInfo, SBuffer *buffer, - SBuffer *helperBuffer); +int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *info, SBuffer *output, SBuffer *assist); int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo, - SValueColumn *valCol, SBuffer *helperBuffer); + SValueColumn *valCol, SBuffer *buffer); int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer); int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo); int32_t tValueCompare(const SValue *tv1, const SValue *tv2); @@ -148,12 +147,12 @@ int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, voi // SColData ================================ typedef struct { - int8_t cmprAlg; + int8_t cmprAlg; // filled by caller int8_t columnFlag; int8_t flag; int8_t dataType; int16_t columnId; - int32_t numOfValues; + int32_t numOfData; int32_t bitmapOriginalSize; int32_t bitmapCompressedSize; int32_t offsetOriginalSize; @@ -172,8 +171,8 @@ int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool for void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); -int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo *info, SBuffer *buffer, - SBuffer *helperBuffer); +int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, void *output, int32_t outputSize, + SBuffer *buffer); int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData, SBuffer *helperBuffer); extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull); @@ -332,16 +331,29 @@ struct SValueColumn { }; typedef struct { - int8_t dataType; // fill by caller - int8_t cmprAlg; // fill by caller - int32_t originalSize; // fill by compress - int32_t compressedSize; // fill by compress + int8_t dataType; // fill by caller + int8_t cmprAlg; // fill by caller + int32_t originalSize; + int32_t compressedSize; } SCompressInfo; -int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *info, SBuffer *buffer, - SBuffer *helperBuffer); -int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *buffer, - SBuffer *helperBuffer); +int32_t tCompressData(void *input, // input + int32_t inputSize, // input size + SCompressInfo *info, // compress info + void *output, // output + int32_t outputSize, // output size + SBuffer *buffer // assistant buffer provided by caller, can be NULL +); +int32_t tDecompressData(void *input, // input + int32_t inputSize, // input size + const SCompressInfo *info, // compress info + void *output, // output + int32_t outputSize, // output size + SBuffer *buffer // assistant buffer provided by caller, can be NULL +); +int32_t tCompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist); +int32_t tDecompressDataToBuffer(void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *output, + SBuffer *assist); #endif diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 7d327e7263..532c1bf39d 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2635,25 +2635,38 @@ _exit: return code; } -int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo *info, SBuffer *buffer, - SBuffer *helperBuffer) { +int32_t tColDataCompress(SColData *colData, // column data + SColDataCompressInfo *info, // compress info + void *output, // output + int32_t outputSize, // output size + SBuffer *buffer // assistant buffer +) { int32_t code; + SBuffer local; + char *outputStart = output; ASSERT(colData->nVal > 0); + tBufferInit(&local); + (*info) = (SColDataCompressInfo){ - .cmprAlg = cmprAlg, + .cmprAlg = info->cmprAlg, .columnFlag = colData->cflag, .flag = colData->flag, .dataType = colData->type, .columnId = colData->cid, - .numOfValues = colData->nVal, + .numOfData = colData->nVal, }; if (colData->flag == HAS_NONE || colData->flag == HAS_NULL) { + tBufferDestroy(&local); return 0; } + if (buffer == NULL) { + buffer = &local; + } + // bitmap if (colData->flag != HAS_VALUE) { if (colData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) { @@ -2664,32 +2677,43 @@ int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo SCompressInfo cinfo = { .dataType = TSDB_DATA_TYPE_TINYINT, - .cmprAlg = cmprAlg, + .cmprAlg = info->cmprAlg, }; - code = tCompressData(colData->pBitMap, info->bitmapOriginalSize, &cinfo, buffer, helperBuffer); - if (code) return code; + code = tCompressData(colData->pBitMap, info->bitmapOriginalSize, &cinfo, outputStart, outputSize, buffer); + if (code) { + tBufferDestroy(&local); + return code; + } info->bitmapCompressedSize = cinfo.compressedSize; + outputStart += cinfo.compressedSize; + outputSize -= cinfo.compressedSize; } if (colData->flag == (HAS_NONE | HAS_NULL)) { + tBufferDestroy(&local); return 0; } // offset if (IS_VAR_DATA_TYPE(colData->type)) { - info->offsetOriginalSize = sizeof(int32_t) * colData->nVal; + info->offsetOriginalSize = sizeof(int32_t) * info->numOfData; SCompressInfo cinfo = { .dataType = TSDB_DATA_TYPE_INT, - .cmprAlg = cmprAlg, + .cmprAlg = info->cmprAlg, }; - code = tCompressData(colData->aOffset, info->offsetOriginalSize, &cinfo, buffer, helperBuffer); - if (code) return code; + code = tCompressData(colData->aOffset, info->offsetOriginalSize, &cinfo, outputStart, outputSize, buffer); + if (code) { + tBufferDestroy(&local); + return code; + } info->offsetCompressedSize = cinfo.compressedSize; + outputStart += cinfo.compressedSize; + outputSize -= cinfo.compressedSize; } // data @@ -2698,29 +2722,42 @@ int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo SCompressInfo cinfo = { .dataType = colData->type, - .cmprAlg = cmprAlg, + .cmprAlg = info->cmprAlg, }; - code = tCompressData(colData->pData, info->dataOriginalSize, &cinfo, buffer, helperBuffer); - if (code) return code; + code = tCompressData(colData->pData, info->dataOriginalSize, &cinfo, outputStart, outputSize, buffer); + if (code) { + tBufferDestroy(&local); + return code; + } info->dataCompressedSize = cinfo.compressedSize; + outputStart += cinfo.compressedSize; + outputSize -= cinfo.compressedSize; } + tBufferDestroy(&local); return 0; } int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData, - SBuffer *helperBuffer) { + SBuffer *buffer) { int32_t code; - int32_t size = 0; + SBuffer local; + char *inputStart = input; + + ASSERT(inputSize == info->bitmapCompressedSize + info->offsetCompressedSize + info->dataCompressedSize); + + tBufferInit(&local); + if (buffer == NULL) { + buffer = &local; + } tColDataClear(colData); - colData->cid = info->columnId; colData->type = info->dataType; colData->cflag = info->columnFlag; - colData->nVal = info->numOfValues; + colData->nVal = info->numOfData; colData->flag = info->flag; if (info->flag == HAS_NONE || info->flag == HAS_NULL) { @@ -2736,10 +2773,19 @@ int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompres .compressedSize = info->bitmapCompressedSize, }; - code = tDecompressData((char *)input + size, info->bitmapCompressedSize, &cinfo, NULL /* TODO */, helperBuffer); - if (code) return code; + code = tRealloc(&colData->pBitMap, cinfo.originalSize); + if (code) { + tBufferDestroy(&local); + return code; + } - size += info->bitmapCompressedSize; + code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, colData->pBitMap, cinfo.originalSize, buffer); + if (code) { + tBufferDestroy(&local); + return code; + } + + inputStart += cinfo.compressedSize; } if (info->flag == (HAS_NONE | HAS_NULL)) { @@ -2748,16 +2794,55 @@ int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompres // offset if (info->offsetOriginalSize > 0) { - // TODO + SCompressInfo cinfo = { + .cmprAlg = info->cmprAlg, + .dataType = TSDB_DATA_TYPE_INT, + .originalSize = info->offsetOriginalSize, + .compressedSize = info->offsetCompressedSize, + }; + + code = tRealloc((uint8_t **)&colData->aOffset, cinfo.originalSize); + if (code) { + tBufferDestroy(&local); + return code; + } + + code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, colData->aOffset, cinfo.originalSize, buffer); + if (code) { + tBufferDestroy(&local); + return code; + } + + inputStart += cinfo.compressedSize; } // data if (info->dataOriginalSize > 0) { - // TODO + colData->nData = info->dataOriginalSize; + + SCompressInfo cinfo = { + .cmprAlg = info->cmprAlg, + .dataType = colData->type, + .originalSize = info->dataOriginalSize, + .compressedSize = info->dataCompressedSize, + }; + + code = tRealloc((uint8_t **)&colData->pData, cinfo.originalSize); + if (code) { + tBufferDestroy(&local); + return code; + } + + code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, colData->pData, cinfo.originalSize, buffer); + if (code) { + tBufferDestroy(&local); + return code; + } + + inputStart += cinfo.compressedSize; } _exit: - ASSERT(inputSize == size); switch (colData->flag) { case HAS_NONE: colData->numOfNone = colData->nVal; @@ -2768,13 +2853,19 @@ _exit: case HAS_VALUE: colData->numOfValue = colData->nVal; break; - case (HAS_VALUE | HAS_NULL | HAS_NONE): - // TODO: loop to get the number of each type from bit2 map - break; default: - ASSERT(0); - // TODO: loop to get the number of each type from bit1 map + for (int32_t i = 0; i < colData->nVal; i++) { + uint8_t bitValue = tColDataGetBitValue(colData, i); + if (bitValue == 0) { + colData->numOfNone++; + } else if (bitValue == 1) { + colData->numOfNull++; + } else { + colData->numOfValue++; + } + } } + tBufferDestroy(&local); return 0; } @@ -4003,69 +4094,122 @@ int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value) { return 0; } -int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *compressInfo, SBuffer *buffer, - SBuffer *helperBuffer) { - int32_t code; - SCompressInfo info; +int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *info, SBuffer *output, SBuffer *assist) { + int32_t code; + SBuffer local; - compressInfo->type = valCol->type; + (*info) = (SValueColumnCompressInfo){ + .cmprAlg = info->cmprAlg, + .type = valCol->type, + }; - if (IS_VAR_DATA_TYPE(valCol->type)) { - info.dataType = TSDB_DATA_TYPE_INT; - info.cmprAlg = compressInfo->cmprAlg; - code = - tCompressData(tBufferGetData(&valCol->offsets), tBufferGetSize(&valCol->offsets), &info, buffer, helperBuffer); - if (code) return code; - - compressInfo->originalOffsetSize = info.originalSize; - compressInfo->compressedOffsetSize = info.compressedSize; - } else { - compressInfo->originalOffsetSize = 0; - compressInfo->compressedOffsetSize = 0; + tBufferInit(&local); + if (assist == NULL) { + assist = &local; } - info.dataType = valCol->type; - info.cmprAlg = compressInfo->cmprAlg; - code = tCompressData(tBufferGetData(&valCol->data), tBufferGetSize(&valCol->data), &info, buffer, helperBuffer); - if (code) return code; - compressInfo->originalDataSize = info.originalSize; - compressInfo->compressedDataSize = info.compressedSize; + // offset + if (IS_VAR_DATA_TYPE(valCol->type)) { + SCompressInfo cinfo = { + .cmprAlg = info->cmprAlg, + .dataType = TSDB_DATA_TYPE_INT, + }; + code = tCompressDataToBuffer(valCol->offsets.data, valCol->offsets.size, &cinfo, output, assist); + if (code) { + tBufferDestroy(&local); + return code; + } + + info->originalOffsetSize = cinfo.originalSize; + info->compressedOffsetSize = cinfo.compressedSize; + } + + // data + SCompressInfo cinfo = { + .cmprAlg = info->cmprAlg, + .dataType = valCol->type, + }; + + code = tCompressDataToBuffer(valCol->data.data, valCol->data.size, &cinfo, output, assist); + if (code) { + tBufferGetData(&local); + return code; + } + info->originalDataSize = cinfo.originalSize; + info->compressedDataSize = cinfo.compressedSize; + + tBufferDestroy(&local); return 0; } -int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo, - SValueColumn *valCol, SBuffer *helperBuffer) { - int32_t code; - SCompressInfo info; +int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *info, + SValueColumn *valCol, SBuffer *buffer) { + int32_t code; + SBuffer local; + char *inputStart = input; + + ASSERT(inputSize == info->compressedOffsetSize + info->compressedDataSize); tValueColumnClear(valCol); + tBufferInit(&local); - valCol->type = compressInfo->type; - if (IS_VAR_DATA_TYPE(valCol->type)) { - ASSERT(inputSize == compressInfo->compressedOffsetSize + compressInfo->compressedDataSize); - - info.dataType = TSDB_DATA_TYPE_INT; - info.cmprAlg = compressInfo->cmprAlg; - info.originalSize = compressInfo->originalOffsetSize; - info.compressedSize = compressInfo->compressedOffsetSize; - code = tDecompressData(input, compressInfo->compressedOffsetSize, &info, &valCol->offsets, helperBuffer); - if (code) return code; - - valCol->numOfValues = compressInfo->originalOffsetSize / tDataTypes[TSDB_DATA_TYPE_INT].bytes; - } else { - ASSERT(inputSize == compressInfo->compressedDataSize); - valCol->numOfValues = compressInfo->originalDataSize / tDataTypes[valCol->type].bytes; + if (buffer == NULL) { + buffer = &local; } - info.dataType = valCol->type; - info.cmprAlg = compressInfo->cmprAlg; - info.originalSize = compressInfo->originalDataSize; - info.compressedSize = compressInfo->compressedDataSize; - code = tDecompressData((char *)input + compressInfo->compressedOffsetSize, compressInfo->compressedDataSize, &info, - &valCol->data, helperBuffer); - if (code) return code; + valCol->type = info->type; + // offset + if (IS_VAR_DATA_TYPE(valCol->type)) { + valCol->numOfValues = info->originalOffsetSize / tDataTypes[TSDB_DATA_TYPE_INT].bytes; + SCompressInfo cinfo = { + .dataType = TSDB_DATA_TYPE_INT, + .cmprAlg = info->cmprAlg, + .originalSize = info->originalOffsetSize, + .compressedSize = info->compressedOffsetSize, + }; + + code = tBufferEnsureCapacity(&valCol->offsets, cinfo.originalSize); + if (code) { + tBufferDestroy(&local); + return code; + } + + code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, valCol->offsets.data, cinfo.originalSize, buffer); + if (code) { + tBufferDestroy(&local); + return code; + } + valCol->offsets.size = cinfo.originalSize; + inputStart += cinfo.compressedSize; + } else { + valCol->numOfValues = info->originalDataSize / tDataTypes[valCol->type].bytes; + } + + // data + SCompressInfo cinfo = { + .dataType = valCol->type, + .cmprAlg = info->cmprAlg, + .originalSize = info->originalDataSize, + .compressedSize = info->compressedDataSize, + }; + + code = tBufferEnsureCapacity(&valCol->data, cinfo.originalSize); + if (code) { + tBufferDestroy(&local); + return code; + } + + code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, valCol->data.data, cinfo.originalSize, buffer); + if (code) { + tBufferDestroy(&local); + return code; + } + valCol->data.size = cinfo.originalSize; + inputStart += cinfo.compressedSize; + + tBufferDestroy(&local); return 0; } @@ -4122,95 +4266,128 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre return 0; } -int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *info, SBuffer *buffer, - SBuffer *helperBuffer) { +int32_t tCompressData(void *input, // input + int32_t inputSize, // input size + SCompressInfo *info, // compress info + void *output, // output + int32_t outputSize, // output size + SBuffer *buffer // assistant buffer provided by caller, can be NULL +) { + int32_t extraSizeNeeded; + int32_t code; + + extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? inputSize : inputSize + COMP_OVERFLOW_BYTES; + ASSERT(outputSize >= extraSizeNeeded); + info->originalSize = inputSize; - if (info->cmprAlg == NO_COMPRESSION) { + memcpy(output, input, inputSize); info->compressedSize = inputSize; - return tBufferAppend(buffer, input, inputSize); } else { - SBuffer hBuffer; - SBuffer *extraBuffer = helperBuffer; - int32_t extraSizeNeeded = inputSize + COMP_OVERFLOW_BYTES; - - tBufferInit(&hBuffer); - - int32_t code = tBufferEnsureCapacity(buffer, buffer->size + extraSizeNeeded); - if (code) return code; + SBuffer local; + tBufferInit(&local); if (info->cmprAlg == TWO_STAGE_COMP) { - if (extraBuffer == NULL) { - extraBuffer = &hBuffer; + if (buffer == NULL) { + buffer = &local; } - code = tBufferEnsureCapacity(extraBuffer, extraSizeNeeded); + code = tBufferEnsureCapacity(buffer, extraSizeNeeded); if (code) return code; } info->compressedSize = tDataTypes[info->dataType].compFunc( // - (void *)input, // input + input, // input inputSize, // input size inputSize / tDataTypes[info->dataType].bytes, // number of elements - tBufferGetDataEnd(buffer), // output - extraSizeNeeded, // output size + output, // output + outputSize, // output size info->cmprAlg, // compression algorithm - tBufferGetData(extraBuffer), // helper buffer - extraSizeNeeded // extra buffer size + buffer->data, // buffer + extraSizeNeeded // buffer size ); if (info->compressedSize < 0) { - tBufferDestroy(&hBuffer); + tBufferDestroy(&local); return TSDB_CODE_COMPRESS_ERROR; } - buffer->size += info->compressedSize; - tBufferDestroy(&hBuffer); + tBufferDestroy(&local); } return 0; } -int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *buffer, - SBuffer *helperBuffer) { +int32_t tDecompressData(void *input, // input + int32_t inputSize, // input size + const SCompressInfo *info, // compress info + void *output, // output + int32_t outputSize, // output size + SBuffer *buffer // assistant buffer provided by caller, can be NULL +) { + int32_t code; + + ASSERT(outputSize >= info->originalSize); + if (info->cmprAlg == NO_COMPRESSION) { ASSERT(inputSize == info->originalSize); - return tBufferAppend(buffer, input, inputSize); + memcpy(output, input, inputSize); } else { - SBuffer hBuffer; - SBuffer *extraBuffer = helperBuffer; - int32_t code; - - tBufferInit(&hBuffer); - - code = tBufferEnsureCapacity(buffer, info->originalSize); - if (code) return code; + SBuffer local; + tBufferInit(&local); if (info->cmprAlg == TWO_STAGE_COMP) { - if (extraBuffer == NULL) { - extraBuffer = &hBuffer; + if (buffer == NULL) { + buffer = &local; } - code = tBufferEnsureCapacity(extraBuffer, info->originalSize + COMP_OVERFLOW_BYTES); + code = tBufferEnsureCapacity(buffer, info->originalSize + COMP_OVERFLOW_BYTES); if (code) return code; } int32_t decompressedSize = tDataTypes[info->dataType].decompFunc( - (void *)input, // input + input, // input inputSize, // inputSize info->originalSize / tDataTypes[info->dataType].bytes, // number of elements - tBufferGetDataEnd(buffer), // output - info->originalSize, // output size + output, // output + outputSize, // output size info->cmprAlg, // compression algorithm - extraBuffer, // helper buffer - info->originalSize + COMP_OVERFLOW_BYTES // extra buffer size + buffer->data, // helper buffer + buffer->capacity // extra buffer size ); if (decompressedSize < 0) { - tBufferDestroy(&hBuffer); + tBufferDestroy(&local); return TSDB_CODE_COMPRESS_ERROR; } + ASSERT(decompressedSize == info->originalSize); - buffer->size += decompressedSize; - tBufferDestroy(&hBuffer); + tBufferDestroy(&local); } return 0; } + +int32_t tCompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist) { + int32_t code; + + code = tBufferEnsureCapacity(output, output->size + inputSize + COMP_OVERFLOW_BYTES); + if (code) return code; + + code = tCompressData(input, inputSize, info, tBufferGetDataEnd(output), output->capacity - output->size, assist); + if (code) return code; + + output->size += info->compressedSize; + return 0; +} + +int32_t tDecompressDataToBuffer(void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *output, + SBuffer *assist) { + int32_t code; + + code = tBufferEnsureCapacity(output, output->size + info->originalSize); + if (code) return code; + + code = tDecompressData(input, inputSize, info, tBufferGetDataEnd(output), output->capacity - output->size, assist); + if (code) return code; + + output->size += info->originalSize; + return 0; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index b49611bf02..7a5224ed41 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -612,6 +612,7 @@ struct SDataFileWriter { SSkmInfo skmTb[1]; SSkmInfo skmRow[1]; uint8_t *bufArr[5]; + SBuffer *buffers; struct { bool opened; @@ -802,7 +803,7 @@ int32_t tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxV } int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, - TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range) { + TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range) { if (brinBlock->numOfRecords == 0) return 0; int32_t code; @@ -853,11 +854,11 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl ASSERT(0); } - tBufferClear(NULL /* TODO */); - code = tCompressData(tBufferGetData(bf), tBufferGetSize(bf), &info, NULL /* TODO */, NULL /* TODO*/); + tBufferClear(&buffers[0]); + code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffers[0].data, &buffers[1]); if (code) return code; - code = tsdbWriteFile(fd, *fileSize, NULL /* TODO */, info.compressedSize); + code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size); if (code) return code; brinBlk.size[i] = info.compressedSize; @@ -920,7 +921,7 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { int32_t lino = 0; code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg, - &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr, + &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->buffers, &writer->ctx->range); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h index c4aed6e787..bc6491148f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h @@ -97,7 +97,7 @@ int32_t tsdbDataFileFlush(SDataFileWriter *writer); // head int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, - TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range); + TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range); int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize); int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer); diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index e37c73d2af..e97b8343f3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -494,8 +494,8 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta .originalSize = statisBlk->numRec * sizeof(int64_t), }; - code = tDecompressData(tBufferGetDataAt(&reader->buffers[0], size), statisBlk->size[i], &info, - &statisBlock->buffers[i], &reader->buffers[1]); + code = tDecompressDataToBuffer(tBufferGetDataAt(&reader->buffers[0], size), statisBlk->size[i], &info, + &statisBlock->buffers[i], &reader->buffers[1]); TSDB_CHECK_CODE(code, lino, _exit); size += statisBlk->size[i]; } @@ -672,8 +672,8 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { }; tBufferClear(&writer->buffers[0]); - code = tCompressData(tBufferGetData(&statisBlock->buffers[i]), tBufferGetSize(&statisBlock->buffers[i]), &info, - &writer->buffers[0], &writer->buffers[1]); + code = tCompressDataToBuffer(tBufferGetData(&statisBlock->buffers[i]), tBufferGetSize(&statisBlock->buffers[i]), + &info, &writer->buffers[0], &writer->buffers[1]); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize); diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 4bf1d9586c..4710337e80 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -37,6 +37,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * int8_t cmprAlg; int32_t szPage; uint8_t *bufArr[8]; + SBuffer *buffers; // reader SArray *aBlockIdx; SMapData mDataBlk[1]; @@ -136,7 +137,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * if (ctx->brinBlock->numOfRecords >= ctx->maxRow) { SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, - ctx->brinBlkArray, ctx->bufArr, &range); + ctx->brinBlkArray, ctx->buffers, &range); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -145,7 +146,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * if (ctx->brinBlock->numOfRecords > 0) { SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, - ctx->brinBlkArray, ctx->bufArr, &range); + ctx->brinBlkArray, ctx->buffers, &range); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index 058b3e0b2f..7afafe3e19 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -357,7 +357,7 @@ int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buff ASSERT(0); } - code = tCompressData(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer); + code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer); if (code) return code; brinBlk->size[i] = info.compressedSize; brinBlk->dp[0].size += info.compressedSize;