more code

This commit is contained in:
Hongze Cheng 2024-02-28 10:58:42 +08:00
parent 7afafa985e
commit d9a4e5ed0f
7 changed files with 348 additions and 157 deletions

View File

@ -96,12 +96,12 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111
// SValueColumn ================================ // SValueColumn ================================
typedef struct { typedef struct {
int8_t cmprAlg; // filled by caller int8_t cmprAlg; // filled by caller
int8_t type; // filled by compress int8_t type;
int32_t originalDataSize; // filled by compress int32_t originalDataSize;
int32_t compressedDataSize; // filled by compress int32_t compressedDataSize;
int32_t originalOffsetSize; // filled by compress int32_t originalOffsetSize;
int32_t compressedOffsetSize; // filled by compress int32_t compressedOffsetSize;
} SValueColumnCompressInfo; } SValueColumnCompressInfo;
int32_t tValueColumnInit(SValueColumn *valCol); int32_t tValueColumnInit(SValueColumn *valCol);
@ -109,10 +109,9 @@ int32_t tValueColumnDestroy(SValueColumn *valCol);
int32_t tValueColumnClear(SValueColumn *valCol); int32_t tValueColumnClear(SValueColumn *valCol);
int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value); int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value);
int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value); int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value);
int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *compressInfo, SBuffer *buffer, int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *info, SBuffer *output, SBuffer *assist);
SBuffer *helperBuffer);
int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo, int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo,
SValueColumn *valCol, SBuffer *helperBuffer); SValueColumn *valCol, SBuffer *buffer);
int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer); int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer);
int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo); int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo);
int32_t tValueCompare(const SValue *tv1, const SValue *tv2); int32_t tValueCompare(const SValue *tv1, const SValue *tv2);
@ -148,12 +147,12 @@ int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, voi
// SColData ================================ // SColData ================================
typedef struct { typedef struct {
int8_t cmprAlg; int8_t cmprAlg; // filled by caller
int8_t columnFlag; int8_t columnFlag;
int8_t flag; int8_t flag;
int8_t dataType; int8_t dataType;
int16_t columnId; int16_t columnId;
int32_t numOfValues; int32_t numOfData;
int32_t bitmapOriginalSize; int32_t bitmapOriginalSize;
int32_t bitmapCompressedSize; int32_t bitmapCompressedSize;
int32_t offsetOriginalSize; int32_t offsetOriginalSize;
@ -172,8 +171,8 @@ int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool for
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg);
int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo *info, SBuffer *buffer, int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, void *output, int32_t outputSize,
SBuffer *helperBuffer); SBuffer *buffer);
int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData, int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData,
SBuffer *helperBuffer); SBuffer *helperBuffer);
extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull); extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull);
@ -332,16 +331,29 @@ struct SValueColumn {
}; };
typedef struct { typedef struct {
int8_t dataType; // fill by caller int8_t dataType; // fill by caller
int8_t cmprAlg; // fill by caller int8_t cmprAlg; // fill by caller
int32_t originalSize; // fill by compress int32_t originalSize;
int32_t compressedSize; // fill by compress int32_t compressedSize;
} SCompressInfo; } SCompressInfo;
int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *info, SBuffer *buffer, int32_t tCompressData(void *input, // input
SBuffer *helperBuffer); int32_t inputSize, // input size
int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *buffer, SCompressInfo *info, // compress info
SBuffer *helperBuffer); void *output, // output
int32_t outputSize, // output size
SBuffer *buffer // assistant buffer provided by caller, can be NULL
);
int32_t tDecompressData(void *input, // input
int32_t inputSize, // input size
const SCompressInfo *info, // compress info
void *output, // output
int32_t outputSize, // output size
SBuffer *buffer // assistant buffer provided by caller, can be NULL
);
int32_t tCompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist);
int32_t tDecompressDataToBuffer(void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *output,
SBuffer *assist);
#endif #endif

View File

@ -2635,25 +2635,38 @@ _exit:
return code; return code;
} }
int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo *info, SBuffer *buffer, int32_t tColDataCompress(SColData *colData, // column data
SBuffer *helperBuffer) { SColDataCompressInfo *info, // compress info
void *output, // output
int32_t outputSize, // output size
SBuffer *buffer // assistant buffer
) {
int32_t code; int32_t code;
SBuffer local;
char *outputStart = output;
ASSERT(colData->nVal > 0); ASSERT(colData->nVal > 0);
tBufferInit(&local);
(*info) = (SColDataCompressInfo){ (*info) = (SColDataCompressInfo){
.cmprAlg = cmprAlg, .cmprAlg = info->cmprAlg,
.columnFlag = colData->cflag, .columnFlag = colData->cflag,
.flag = colData->flag, .flag = colData->flag,
.dataType = colData->type, .dataType = colData->type,
.columnId = colData->cid, .columnId = colData->cid,
.numOfValues = colData->nVal, .numOfData = colData->nVal,
}; };
if (colData->flag == HAS_NONE || colData->flag == HAS_NULL) { if (colData->flag == HAS_NONE || colData->flag == HAS_NULL) {
tBufferDestroy(&local);
return 0; return 0;
} }
if (buffer == NULL) {
buffer = &local;
}
// bitmap // bitmap
if (colData->flag != HAS_VALUE) { if (colData->flag != HAS_VALUE) {
if (colData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) { if (colData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) {
@ -2664,32 +2677,43 @@ int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo
SCompressInfo cinfo = { SCompressInfo cinfo = {
.dataType = TSDB_DATA_TYPE_TINYINT, .dataType = TSDB_DATA_TYPE_TINYINT,
.cmprAlg = cmprAlg, .cmprAlg = info->cmprAlg,
}; };
code = tCompressData(colData->pBitMap, info->bitmapOriginalSize, &cinfo, buffer, helperBuffer); code = tCompressData(colData->pBitMap, info->bitmapOriginalSize, &cinfo, outputStart, outputSize, buffer);
if (code) return code; if (code) {
tBufferDestroy(&local);
return code;
}
info->bitmapCompressedSize = cinfo.compressedSize; info->bitmapCompressedSize = cinfo.compressedSize;
outputStart += cinfo.compressedSize;
outputSize -= cinfo.compressedSize;
} }
if (colData->flag == (HAS_NONE | HAS_NULL)) { if (colData->flag == (HAS_NONE | HAS_NULL)) {
tBufferDestroy(&local);
return 0; return 0;
} }
// offset // offset
if (IS_VAR_DATA_TYPE(colData->type)) { if (IS_VAR_DATA_TYPE(colData->type)) {
info->offsetOriginalSize = sizeof(int32_t) * colData->nVal; info->offsetOriginalSize = sizeof(int32_t) * info->numOfData;
SCompressInfo cinfo = { SCompressInfo cinfo = {
.dataType = TSDB_DATA_TYPE_INT, .dataType = TSDB_DATA_TYPE_INT,
.cmprAlg = cmprAlg, .cmprAlg = info->cmprAlg,
}; };
code = tCompressData(colData->aOffset, info->offsetOriginalSize, &cinfo, buffer, helperBuffer); code = tCompressData(colData->aOffset, info->offsetOriginalSize, &cinfo, outputStart, outputSize, buffer);
if (code) return code; if (code) {
tBufferDestroy(&local);
return code;
}
info->offsetCompressedSize = cinfo.compressedSize; info->offsetCompressedSize = cinfo.compressedSize;
outputStart += cinfo.compressedSize;
outputSize -= cinfo.compressedSize;
} }
// data // data
@ -2698,29 +2722,42 @@ int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo
SCompressInfo cinfo = { SCompressInfo cinfo = {
.dataType = colData->type, .dataType = colData->type,
.cmprAlg = cmprAlg, .cmprAlg = info->cmprAlg,
}; };
code = tCompressData(colData->pData, info->dataOriginalSize, &cinfo, buffer, helperBuffer); code = tCompressData(colData->pData, info->dataOriginalSize, &cinfo, outputStart, outputSize, buffer);
if (code) return code; if (code) {
tBufferDestroy(&local);
return code;
}
info->dataCompressedSize = cinfo.compressedSize; info->dataCompressedSize = cinfo.compressedSize;
outputStart += cinfo.compressedSize;
outputSize -= cinfo.compressedSize;
} }
tBufferDestroy(&local);
return 0; return 0;
} }
int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData, int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData,
SBuffer *helperBuffer) { SBuffer *buffer) {
int32_t code; int32_t code;
int32_t size = 0; SBuffer local;
char *inputStart = input;
ASSERT(inputSize == info->bitmapCompressedSize + info->offsetCompressedSize + info->dataCompressedSize);
tBufferInit(&local);
if (buffer == NULL) {
buffer = &local;
}
tColDataClear(colData); tColDataClear(colData);
colData->cid = info->columnId; colData->cid = info->columnId;
colData->type = info->dataType; colData->type = info->dataType;
colData->cflag = info->columnFlag; colData->cflag = info->columnFlag;
colData->nVal = info->numOfValues; colData->nVal = info->numOfData;
colData->flag = info->flag; colData->flag = info->flag;
if (info->flag == HAS_NONE || info->flag == HAS_NULL) { if (info->flag == HAS_NONE || info->flag == HAS_NULL) {
@ -2736,10 +2773,19 @@ int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompres
.compressedSize = info->bitmapCompressedSize, .compressedSize = info->bitmapCompressedSize,
}; };
code = tDecompressData((char *)input + size, info->bitmapCompressedSize, &cinfo, NULL /* TODO */, helperBuffer); code = tRealloc(&colData->pBitMap, cinfo.originalSize);
if (code) return code; if (code) {
tBufferDestroy(&local);
return code;
}
size += info->bitmapCompressedSize; code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, colData->pBitMap, cinfo.originalSize, buffer);
if (code) {
tBufferDestroy(&local);
return code;
}
inputStart += cinfo.compressedSize;
} }
if (info->flag == (HAS_NONE | HAS_NULL)) { if (info->flag == (HAS_NONE | HAS_NULL)) {
@ -2748,16 +2794,55 @@ int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompres
// offset // offset
if (info->offsetOriginalSize > 0) { if (info->offsetOriginalSize > 0) {
// TODO SCompressInfo cinfo = {
.cmprAlg = info->cmprAlg,
.dataType = TSDB_DATA_TYPE_INT,
.originalSize = info->offsetOriginalSize,
.compressedSize = info->offsetCompressedSize,
};
code = tRealloc((uint8_t **)&colData->aOffset, cinfo.originalSize);
if (code) {
tBufferDestroy(&local);
return code;
}
code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, colData->aOffset, cinfo.originalSize, buffer);
if (code) {
tBufferDestroy(&local);
return code;
}
inputStart += cinfo.compressedSize;
} }
// data // data
if (info->dataOriginalSize > 0) { if (info->dataOriginalSize > 0) {
// TODO colData->nData = info->dataOriginalSize;
SCompressInfo cinfo = {
.cmprAlg = info->cmprAlg,
.dataType = colData->type,
.originalSize = info->dataOriginalSize,
.compressedSize = info->dataCompressedSize,
};
code = tRealloc((uint8_t **)&colData->pData, cinfo.originalSize);
if (code) {
tBufferDestroy(&local);
return code;
}
code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, colData->pData, cinfo.originalSize, buffer);
if (code) {
tBufferDestroy(&local);
return code;
}
inputStart += cinfo.compressedSize;
} }
_exit: _exit:
ASSERT(inputSize == size);
switch (colData->flag) { switch (colData->flag) {
case HAS_NONE: case HAS_NONE:
colData->numOfNone = colData->nVal; colData->numOfNone = colData->nVal;
@ -2768,13 +2853,19 @@ _exit:
case HAS_VALUE: case HAS_VALUE:
colData->numOfValue = colData->nVal; colData->numOfValue = colData->nVal;
break; break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
// TODO: loop to get the number of each type from bit2 map
break;
default: default:
ASSERT(0); for (int32_t i = 0; i < colData->nVal; i++) {
// TODO: loop to get the number of each type from bit1 map uint8_t bitValue = tColDataGetBitValue(colData, i);
if (bitValue == 0) {
colData->numOfNone++;
} else if (bitValue == 1) {
colData->numOfNull++;
} else {
colData->numOfValue++;
}
}
} }
tBufferDestroy(&local);
return 0; return 0;
} }
@ -4003,69 +4094,122 @@ int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value) {
return 0; return 0;
} }
int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *compressInfo, SBuffer *buffer, int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *info, SBuffer *output, SBuffer *assist) {
SBuffer *helperBuffer) { int32_t code;
int32_t code; SBuffer local;
SCompressInfo info;
compressInfo->type = valCol->type; (*info) = (SValueColumnCompressInfo){
.cmprAlg = info->cmprAlg,
.type = valCol->type,
};
if (IS_VAR_DATA_TYPE(valCol->type)) { tBufferInit(&local);
info.dataType = TSDB_DATA_TYPE_INT; if (assist == NULL) {
info.cmprAlg = compressInfo->cmprAlg; assist = &local;
code =
tCompressData(tBufferGetData(&valCol->offsets), tBufferGetSize(&valCol->offsets), &info, buffer, helperBuffer);
if (code) return code;
compressInfo->originalOffsetSize = info.originalSize;
compressInfo->compressedOffsetSize = info.compressedSize;
} else {
compressInfo->originalOffsetSize = 0;
compressInfo->compressedOffsetSize = 0;
} }
info.dataType = valCol->type; // offset
info.cmprAlg = compressInfo->cmprAlg; if (IS_VAR_DATA_TYPE(valCol->type)) {
code = tCompressData(tBufferGetData(&valCol->data), tBufferGetSize(&valCol->data), &info, buffer, helperBuffer); SCompressInfo cinfo = {
if (code) return code; .cmprAlg = info->cmprAlg,
compressInfo->originalDataSize = info.originalSize; .dataType = TSDB_DATA_TYPE_INT,
compressInfo->compressedDataSize = info.compressedSize; };
code = tCompressDataToBuffer(valCol->offsets.data, valCol->offsets.size, &cinfo, output, assist);
if (code) {
tBufferDestroy(&local);
return code;
}
info->originalOffsetSize = cinfo.originalSize;
info->compressedOffsetSize = cinfo.compressedSize;
}
// data
SCompressInfo cinfo = {
.cmprAlg = info->cmprAlg,
.dataType = valCol->type,
};
code = tCompressDataToBuffer(valCol->data.data, valCol->data.size, &cinfo, output, assist);
if (code) {
tBufferGetData(&local);
return code;
}
info->originalDataSize = cinfo.originalSize;
info->compressedDataSize = cinfo.compressedSize;
tBufferDestroy(&local);
return 0; return 0;
} }
int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo, int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *info,
SValueColumn *valCol, SBuffer *helperBuffer) { SValueColumn *valCol, SBuffer *buffer) {
int32_t code; int32_t code;
SCompressInfo info; SBuffer local;
char *inputStart = input;
ASSERT(inputSize == info->compressedOffsetSize + info->compressedDataSize);
tValueColumnClear(valCol); tValueColumnClear(valCol);
tBufferInit(&local);
valCol->type = compressInfo->type; if (buffer == NULL) {
if (IS_VAR_DATA_TYPE(valCol->type)) { buffer = &local;
ASSERT(inputSize == compressInfo->compressedOffsetSize + compressInfo->compressedDataSize);
info.dataType = TSDB_DATA_TYPE_INT;
info.cmprAlg = compressInfo->cmprAlg;
info.originalSize = compressInfo->originalOffsetSize;
info.compressedSize = compressInfo->compressedOffsetSize;
code = tDecompressData(input, compressInfo->compressedOffsetSize, &info, &valCol->offsets, helperBuffer);
if (code) return code;
valCol->numOfValues = compressInfo->originalOffsetSize / tDataTypes[TSDB_DATA_TYPE_INT].bytes;
} else {
ASSERT(inputSize == compressInfo->compressedDataSize);
valCol->numOfValues = compressInfo->originalDataSize / tDataTypes[valCol->type].bytes;
} }
info.dataType = valCol->type; valCol->type = info->type;
info.cmprAlg = compressInfo->cmprAlg; // offset
info.originalSize = compressInfo->originalDataSize; if (IS_VAR_DATA_TYPE(valCol->type)) {
info.compressedSize = compressInfo->compressedDataSize; valCol->numOfValues = info->originalOffsetSize / tDataTypes[TSDB_DATA_TYPE_INT].bytes;
code = tDecompressData((char *)input + compressInfo->compressedOffsetSize, compressInfo->compressedDataSize, &info,
&valCol->data, helperBuffer);
if (code) return code;
SCompressInfo cinfo = {
.dataType = TSDB_DATA_TYPE_INT,
.cmprAlg = info->cmprAlg,
.originalSize = info->originalOffsetSize,
.compressedSize = info->compressedOffsetSize,
};
code = tBufferEnsureCapacity(&valCol->offsets, cinfo.originalSize);
if (code) {
tBufferDestroy(&local);
return code;
}
code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, valCol->offsets.data, cinfo.originalSize, buffer);
if (code) {
tBufferDestroy(&local);
return code;
}
valCol->offsets.size = cinfo.originalSize;
inputStart += cinfo.compressedSize;
} else {
valCol->numOfValues = info->originalDataSize / tDataTypes[valCol->type].bytes;
}
// data
SCompressInfo cinfo = {
.dataType = valCol->type,
.cmprAlg = info->cmprAlg,
.originalSize = info->originalDataSize,
.compressedSize = info->compressedDataSize,
};
code = tBufferEnsureCapacity(&valCol->data, cinfo.originalSize);
if (code) {
tBufferDestroy(&local);
return code;
}
code = tDecompressData(inputStart, cinfo.compressedSize, &cinfo, valCol->data.data, cinfo.originalSize, buffer);
if (code) {
tBufferDestroy(&local);
return code;
}
valCol->data.size = cinfo.originalSize;
inputStart += cinfo.compressedSize;
tBufferDestroy(&local);
return 0; return 0;
} }
@ -4122,95 +4266,128 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre
return 0; return 0;
} }
int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *info, SBuffer *buffer, int32_t tCompressData(void *input, // input
SBuffer *helperBuffer) { int32_t inputSize, // input size
SCompressInfo *info, // compress info
void *output, // output
int32_t outputSize, // output size
SBuffer *buffer // assistant buffer provided by caller, can be NULL
) {
int32_t extraSizeNeeded;
int32_t code;
extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? inputSize : inputSize + COMP_OVERFLOW_BYTES;
ASSERT(outputSize >= extraSizeNeeded);
info->originalSize = inputSize; info->originalSize = inputSize;
if (info->cmprAlg == NO_COMPRESSION) { if (info->cmprAlg == NO_COMPRESSION) {
memcpy(output, input, inputSize);
info->compressedSize = inputSize; info->compressedSize = inputSize;
return tBufferAppend(buffer, input, inputSize);
} else { } else {
SBuffer hBuffer; SBuffer local;
SBuffer *extraBuffer = helperBuffer;
int32_t extraSizeNeeded = inputSize + COMP_OVERFLOW_BYTES;
tBufferInit(&hBuffer);
int32_t code = tBufferEnsureCapacity(buffer, buffer->size + extraSizeNeeded);
if (code) return code;
tBufferInit(&local);
if (info->cmprAlg == TWO_STAGE_COMP) { if (info->cmprAlg == TWO_STAGE_COMP) {
if (extraBuffer == NULL) { if (buffer == NULL) {
extraBuffer = &hBuffer; buffer = &local;
} }
code = tBufferEnsureCapacity(extraBuffer, extraSizeNeeded); code = tBufferEnsureCapacity(buffer, extraSizeNeeded);
if (code) return code; if (code) return code;
} }
info->compressedSize = tDataTypes[info->dataType].compFunc( // info->compressedSize = tDataTypes[info->dataType].compFunc( //
(void *)input, // input input, // input
inputSize, // input size inputSize, // input size
inputSize / tDataTypes[info->dataType].bytes, // number of elements inputSize / tDataTypes[info->dataType].bytes, // number of elements
tBufferGetDataEnd(buffer), // output output, // output
extraSizeNeeded, // output size outputSize, // output size
info->cmprAlg, // compression algorithm info->cmprAlg, // compression algorithm
tBufferGetData(extraBuffer), // helper buffer buffer->data, // buffer
extraSizeNeeded // extra buffer size extraSizeNeeded // buffer size
); );
if (info->compressedSize < 0) { if (info->compressedSize < 0) {
tBufferDestroy(&hBuffer); tBufferDestroy(&local);
return TSDB_CODE_COMPRESS_ERROR; return TSDB_CODE_COMPRESS_ERROR;
} }
buffer->size += info->compressedSize; tBufferDestroy(&local);
tBufferDestroy(&hBuffer);
} }
return 0; return 0;
} }
int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *buffer, int32_t tDecompressData(void *input, // input
SBuffer *helperBuffer) { int32_t inputSize, // input size
const SCompressInfo *info, // compress info
void *output, // output
int32_t outputSize, // output size
SBuffer *buffer // assistant buffer provided by caller, can be NULL
) {
int32_t code;
ASSERT(outputSize >= info->originalSize);
if (info->cmprAlg == NO_COMPRESSION) { if (info->cmprAlg == NO_COMPRESSION) {
ASSERT(inputSize == info->originalSize); ASSERT(inputSize == info->originalSize);
return tBufferAppend(buffer, input, inputSize); memcpy(output, input, inputSize);
} else { } else {
SBuffer hBuffer; SBuffer local;
SBuffer *extraBuffer = helperBuffer;
int32_t code;
tBufferInit(&hBuffer);
code = tBufferEnsureCapacity(buffer, info->originalSize);
if (code) return code;
tBufferInit(&local);
if (info->cmprAlg == TWO_STAGE_COMP) { if (info->cmprAlg == TWO_STAGE_COMP) {
if (extraBuffer == NULL) { if (buffer == NULL) {
extraBuffer = &hBuffer; buffer = &local;
} }
code = tBufferEnsureCapacity(extraBuffer, info->originalSize + COMP_OVERFLOW_BYTES); code = tBufferEnsureCapacity(buffer, info->originalSize + COMP_OVERFLOW_BYTES);
if (code) return code; if (code) return code;
} }
int32_t decompressedSize = tDataTypes[info->dataType].decompFunc( int32_t decompressedSize = tDataTypes[info->dataType].decompFunc(
(void *)input, // input input, // input
inputSize, // inputSize inputSize, // inputSize
info->originalSize / tDataTypes[info->dataType].bytes, // number of elements info->originalSize / tDataTypes[info->dataType].bytes, // number of elements
tBufferGetDataEnd(buffer), // output output, // output
info->originalSize, // output size outputSize, // output size
info->cmprAlg, // compression algorithm info->cmprAlg, // compression algorithm
extraBuffer, // helper buffer buffer->data, // helper buffer
info->originalSize + COMP_OVERFLOW_BYTES // extra buffer size buffer->capacity // extra buffer size
); );
if (decompressedSize < 0) { if (decompressedSize < 0) {
tBufferDestroy(&hBuffer); tBufferDestroy(&local);
return TSDB_CODE_COMPRESS_ERROR; return TSDB_CODE_COMPRESS_ERROR;
} }
ASSERT(decompressedSize == info->originalSize); ASSERT(decompressedSize == info->originalSize);
buffer->size += decompressedSize; tBufferDestroy(&local);
tBufferDestroy(&hBuffer);
} }
return 0; return 0;
} }
int32_t tCompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist) {
int32_t code;
code = tBufferEnsureCapacity(output, output->size + inputSize + COMP_OVERFLOW_BYTES);
if (code) return code;
code = tCompressData(input, inputSize, info, tBufferGetDataEnd(output), output->capacity - output->size, assist);
if (code) return code;
output->size += info->compressedSize;
return 0;
}
int32_t tDecompressDataToBuffer(void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *output,
SBuffer *assist) {
int32_t code;
code = tBufferEnsureCapacity(output, output->size + info->originalSize);
if (code) return code;
code = tDecompressData(input, inputSize, info, tBufferGetDataEnd(output), output->capacity - output->size, assist);
if (code) return code;
output->size += info->originalSize;
return 0;
}

View File

@ -612,6 +612,7 @@ struct SDataFileWriter {
SSkmInfo skmTb[1]; SSkmInfo skmTb[1];
SSkmInfo skmRow[1]; SSkmInfo skmRow[1];
uint8_t *bufArr[5]; uint8_t *bufArr[5];
SBuffer *buffers;
struct { struct {
bool opened; bool opened;
@ -802,7 +803,7 @@ int32_t tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxV
} }
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range) { TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range) {
if (brinBlock->numOfRecords == 0) return 0; if (brinBlock->numOfRecords == 0) return 0;
int32_t code; int32_t code;
@ -853,11 +854,11 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
ASSERT(0); ASSERT(0);
} }
tBufferClear(NULL /* TODO */); tBufferClear(&buffers[0]);
code = tCompressData(tBufferGetData(bf), tBufferGetSize(bf), &info, NULL /* TODO */, NULL /* TODO*/); code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffers[0].data, &buffers[1]);
if (code) return code; if (code) return code;
code = tsdbWriteFile(fd, *fileSize, NULL /* TODO */, info.compressedSize); code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size);
if (code) return code; if (code) return code;
brinBlk.size[i] = info.compressedSize; brinBlk.size[i] = info.compressedSize;
@ -920,7 +921,7 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
int32_t lino = 0; int32_t lino = 0;
code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg, code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr, &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->buffers,
&writer->ctx->range); &writer->ctx->range);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -97,7 +97,7 @@ int32_t tsdbDataFileFlush(SDataFileWriter *writer);
// head // head
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range); TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range);
int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize); int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize);
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer); int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer);

View File

@ -494,8 +494,8 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
.originalSize = statisBlk->numRec * sizeof(int64_t), .originalSize = statisBlk->numRec * sizeof(int64_t),
}; };
code = tDecompressData(tBufferGetDataAt(&reader->buffers[0], size), statisBlk->size[i], &info, code = tDecompressDataToBuffer(tBufferGetDataAt(&reader->buffers[0], size), statisBlk->size[i], &info,
&statisBlock->buffers[i], &reader->buffers[1]); &statisBlock->buffers[i], &reader->buffers[1]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
size += statisBlk->size[i]; size += statisBlk->size[i];
} }
@ -672,8 +672,8 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
}; };
tBufferClear(&writer->buffers[0]); tBufferClear(&writer->buffers[0]);
code = tCompressData(tBufferGetData(&statisBlock->buffers[i]), tBufferGetSize(&statisBlock->buffers[i]), &info, code = tCompressDataToBuffer(tBufferGetData(&statisBlock->buffers[i]), tBufferGetSize(&statisBlock->buffers[i]),
&writer->buffers[0], &writer->buffers[1]); &info, &writer->buffers[0], &writer->buffers[1]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize); code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize);

View File

@ -37,6 +37,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
int8_t cmprAlg; int8_t cmprAlg;
int32_t szPage; int32_t szPage;
uint8_t *bufArr[8]; uint8_t *bufArr[8];
SBuffer *buffers;
// reader // reader
SArray *aBlockIdx; SArray *aBlockIdx;
SMapData mDataBlk[1]; SMapData mDataBlk[1];
@ -136,7 +137,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
if (ctx->brinBlock->numOfRecords >= ctx->maxRow) { if (ctx->brinBlock->numOfRecords >= ctx->maxRow) {
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
ctx->brinBlkArray, ctx->bufArr, &range); ctx->brinBlkArray, ctx->buffers, &range);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
@ -145,7 +146,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
if (ctx->brinBlock->numOfRecords > 0) { if (ctx->brinBlock->numOfRecords > 0) {
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
ctx->brinBlkArray, ctx->bufArr, &range); ctx->brinBlkArray, ctx->buffers, &range);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }

View File

@ -357,7 +357,7 @@ int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buff
ASSERT(0); ASSERT(0);
} }
code = tCompressData(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer); code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer);
if (code) return code; if (code) return code;
brinBlk->size[i] = info.compressedSize; brinBlk->size[i] = info.compressedSize;
brinBlk->dp[0].size += info.compressedSize; brinBlk->dp[0].size += info.compressedSize;