more code

This commit is contained in:
Hongze Cheng 2024-02-27 15:10:42 +08:00
parent a90555c098
commit 8884c1ca77
2 changed files with 195 additions and 34 deletions

View File

@ -147,6 +147,21 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remov
int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, void *pMsgBuf);
// SColData ================================
typedef struct {
int8_t cmprAlg;
int8_t columnFlag;
int8_t flag;
int8_t dataType;
int16_t columnId;
int32_t numOfValues;
int32_t bitmapOriginalSize;
int32_t bitmapCompressedSize;
int32_t offsetOriginalSize;
int32_t offsetCompressedSize;
int32_t dataOriginalSize;
int32_t dataCompressedSize;
} SColDataCompressInfo;
typedef void *(*xMallocFn)(void *, int32_t);
void tColDataDestroy(void *ph);
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t cflag);
@ -157,6 +172,10 @@ int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool for
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg);
int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo *info, SBuffer *buffer,
SBuffer *helperBuffer);
int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData,
SBuffer *helperBuffer);
extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull);
// for stmt bind
@ -319,9 +338,9 @@ typedef struct {
int32_t compressedSize; // fill by compress
} SCompressInfo;
int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprInfo, SBuffer *buffer,
int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *info, SBuffer *buffer,
SBuffer *helperBuffer);
int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *cmprInfo, SBuffer *buffer,
int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *buffer,
SBuffer *helperBuffer);
#endif

View File

@ -2635,6 +2635,148 @@ _exit:
return code;
}
int32_t tColDataCompress(SColData *colData, int8_t cmprAlg, SColDataCompressInfo *info, SBuffer *buffer,
SBuffer *helperBuffer) {
int32_t code;
ASSERT(colData->nVal > 0);
(*info) = (SColDataCompressInfo){
.cmprAlg = cmprAlg,
.columnFlag = colData->cflag,
.flag = colData->flag,
.dataType = colData->type,
.columnId = colData->cid,
.numOfValues = colData->nVal,
};
if (colData->flag == HAS_NONE || colData->flag == HAS_NULL) {
return 0;
}
// bitmap
if (colData->flag != HAS_VALUE) {
if (colData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) {
info->bitmapOriginalSize = BIT2_SIZE(colData->nVal);
} else {
info->bitmapOriginalSize = BIT1_SIZE(colData->nVal);
}
SCompressInfo cinfo = {
.dataType = TSDB_DATA_TYPE_TINYINT,
.cmprAlg = cmprAlg,
};
code = tCompressData(colData->pBitMap, info->bitmapOriginalSize, &cinfo, buffer, helperBuffer);
if (code) return code;
info->bitmapCompressedSize = cinfo.compressedSize;
}
if (colData->flag == (HAS_NONE | HAS_NULL)) {
return 0;
}
// offset
if (IS_VAR_DATA_TYPE(colData->type)) {
info->offsetOriginalSize = sizeof(int32_t) * colData->nVal;
SCompressInfo cinfo = {
.dataType = TSDB_DATA_TYPE_INT,
.cmprAlg = cmprAlg,
};
code = tCompressData(colData->aOffset, info->offsetOriginalSize, &cinfo, buffer, helperBuffer);
if (code) return code;
info->offsetCompressedSize = cinfo.compressedSize;
}
// data
if (colData->nData > 0) {
info->dataOriginalSize = colData->nData;
SCompressInfo cinfo = {
.dataType = colData->type,
.cmprAlg = cmprAlg,
};
code = tCompressData(colData->pData, info->dataOriginalSize, &cinfo, buffer, helperBuffer);
if (code) return code;
info->dataCompressedSize = cinfo.compressedSize;
}
return 0;
}
int32_t tColDataDecompress(void *input, int32_t inputSize, const SColDataCompressInfo *info, SColData *colData,
SBuffer *helperBuffer) {
int32_t code;
int32_t size = 0;
tColDataClear(colData);
colData->cid = info->columnId;
colData->type = info->dataType;
colData->cflag = info->columnFlag;
colData->nVal = info->numOfValues;
colData->flag = info->flag;
if (info->flag == HAS_NONE || info->flag == HAS_NULL) {
goto _exit;
}
// bitmap
if (info->bitmapOriginalSize > 0) {
SCompressInfo cinfo = {
.dataType = TSDB_DATA_TYPE_TINYINT,
.cmprAlg = info->cmprAlg,
.originalSize = info->bitmapOriginalSize,
.compressedSize = info->bitmapCompressedSize,
};
code = tDecompressData((char *)input + size, info->bitmapCompressedSize, &cinfo, NULL /* TODO */, helperBuffer);
if (code) return code;
size += info->bitmapCompressedSize;
}
if (info->flag == (HAS_NONE | HAS_NULL)) {
goto _exit;
}
// offset
if (info->offsetOriginalSize > 0) {
// TODO
}
// data
if (info->dataOriginalSize > 0) {
// TODO
}
_exit:
ASSERT(inputSize == size);
switch (colData->flag) {
case HAS_NONE:
colData->numOfNone = colData->nVal;
break;
case HAS_NULL:
colData->numOfNull = colData->nVal;
break;
case HAS_VALUE:
colData->numOfValue = colData->nVal;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
// TODO: loop to get the number of each type from bit2 map
break;
default:
// TODO: loop to get the number of each type from bit1 map
}
return 0;
}
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data) {
int32_t code = 0;
@ -3979,12 +4121,12 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre
return 0;
}
int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprInfo, SBuffer *buffer,
int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *info, SBuffer *buffer,
SBuffer *helperBuffer) {
cmprInfo->originalSize = inputSize;
info->originalSize = inputSize;
if (cmprInfo->cmprAlg == NO_COMPRESSION) {
cmprInfo->compressedSize = inputSize;
if (info->cmprAlg == NO_COMPRESSION) {
info->compressedSize = inputSize;
return tBufferAppend(buffer, input, inputSize);
} else {
SBuffer hBuffer;
@ -3996,7 +4138,7 @@ int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprI
int32_t code = tBufferEnsureCapacity(buffer, buffer->size + extraSizeNeeded);
if (code) return code;
if (cmprInfo->cmprAlg == TWO_STAGE_COMP) {
if (info->cmprAlg == TWO_STAGE_COMP) {
if (extraBuffer == NULL) {
extraBuffer = &hBuffer;
}
@ -4005,32 +4147,32 @@ int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprI
if (code) return code;
}
cmprInfo->compressedSize = tDataTypes[cmprInfo->dataType].compFunc( //
(void *)input, // input
inputSize, // input size
inputSize / tDataTypes[cmprInfo->dataType].bytes, // number of elements
tBufferGetDataEnd(buffer), // output
extraSizeNeeded, // output size
cmprInfo->cmprAlg, // compression algorithm
tBufferGetData(extraBuffer), // helper buffer
extraSizeNeeded // extra buffer size
info->compressedSize = tDataTypes[info->dataType].compFunc( //
(void *)input, // input
inputSize, // input size
inputSize / tDataTypes[info->dataType].bytes, // number of elements
tBufferGetDataEnd(buffer), // output
extraSizeNeeded, // output size
info->cmprAlg, // compression algorithm
tBufferGetData(extraBuffer), // helper buffer
extraSizeNeeded // extra buffer size
);
if (cmprInfo->compressedSize < 0) {
if (info->compressedSize < 0) {
tBufferDestroy(&hBuffer);
return TSDB_CODE_COMPRESS_ERROR;
}
buffer->size += cmprInfo->compressedSize;
buffer->size += info->compressedSize;
tBufferDestroy(&hBuffer);
}
return 0;
}
int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *cmprInfo, SBuffer *buffer,
int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *info, SBuffer *buffer,
SBuffer *helperBuffer) {
if (cmprInfo->cmprAlg == NO_COMPRESSION) {
ASSERT(inputSize == cmprInfo->originalSize);
if (info->cmprAlg == NO_COMPRESSION) {
ASSERT(inputSize == info->originalSize);
return tBufferAppend(buffer, input, inputSize);
} else {
SBuffer hBuffer;
@ -4039,32 +4181,32 @@ int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInf
tBufferInit(&hBuffer);
code = tBufferEnsureCapacity(buffer, cmprInfo->originalSize);
code = tBufferEnsureCapacity(buffer, info->originalSize);
if (code) return code;
if (cmprInfo->cmprAlg == TWO_STAGE_COMP) {
if (info->cmprAlg == TWO_STAGE_COMP) {
if (extraBuffer == NULL) {
extraBuffer = &hBuffer;
}
code = tBufferEnsureCapacity(extraBuffer, cmprInfo->originalSize + COMP_OVERFLOW_BYTES);
code = tBufferEnsureCapacity(extraBuffer, info->originalSize + COMP_OVERFLOW_BYTES);
if (code) return code;
}
int32_t decompressedSize = tDataTypes[cmprInfo->dataType].decompFunc(
(void *)input, // input
inputSize, // inputSize
cmprInfo->originalSize / tDataTypes[cmprInfo->dataType].bytes, // number of elements
tBufferGetDataEnd(buffer), // output
cmprInfo->originalSize, // output size
cmprInfo->cmprAlg, // compression algorithm
extraBuffer, // helper buffer
cmprInfo->originalSize + COMP_OVERFLOW_BYTES // extra buffer size
int32_t decompressedSize = tDataTypes[info->dataType].decompFunc(
(void *)input, // input
inputSize, // inputSize
info->originalSize / tDataTypes[info->dataType].bytes, // number of elements
tBufferGetDataEnd(buffer), // output
info->originalSize, // output size
info->cmprAlg, // compression algorithm
extraBuffer, // helper buffer
info->originalSize + COMP_OVERFLOW_BYTES // extra buffer size
);
if (decompressedSize < 0) {
tBufferDestroy(&hBuffer);
return TSDB_CODE_COMPRESS_ERROR;
}
ASSERT(decompressedSize == cmprInfo->originalSize);
ASSERT(decompressedSize == info->originalSize);
buffer->size += decompressedSize;
tBufferDestroy(&hBuffer);
}